4410: crunch-job fixups from code review.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "/usr/bin/docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $Job = JSON::decode_json($jobspec);
173   $local_job = 1;
174 }
175
176
177 # Make sure our workers (our slurm nodes, localhost, or whatever) are
178 # at least able to run basic commands: they aren't down or severely
179 # misconfigured.
180 my $cmd = ['true'];
181 if ($Job->{docker_image_locator}) {
182   $cmd = [$docker_bin, 'ps', '-q'];
183 }
184 Log(undef, "Sanity check is `@$cmd`");
185 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
186      $cmd,
187      {fork => 1});
188 if ($? != 0) {
189   Log(undef, "Sanity check failed: ".exit_status_s($?));
190   exit EX_TEMPFAIL;
191 }
192 Log(undef, "Sanity check OK");
193
194
195 my $User = api_call("users/current");
196
197 if (!$local_job) {
198   if (!$force_unlock) {
199     # Claim this job, and make sure nobody else does
200     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
201     if ($@) {
202       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
203       exit EX_TEMPFAIL;
204     };
205   }
206 }
207 else
208 {
209   if (!$resume_stash)
210   {
211     map { croak ("No $_ specified") unless $Job->{$_} }
212     qw(script script_version script_parameters);
213   }
214
215   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
216   $Job->{'started_at'} = gmtime;
217   $Job->{'state'} = 'Running';
218
219   $Job = api_call("jobs/create", job => $Job);
220 }
221 $job_id = $Job->{'uuid'};
222
223 my $keep_logfile = $job_id . '.log.txt';
224 log_writer_start($keep_logfile);
225
226 $Job->{'runtime_constraints'} ||= {};
227 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
228 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
229
230 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
231 if ($? == 0) {
232   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
233   chomp($gem_versions);
234   chop($gem_versions);  # Closing parentheses
235 } else {
236   $gem_versions = "";
237 }
238 Log(undef,
239     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
240
241 Log (undef, "check slurm allocation");
242 my @slot;
243 my @node;
244 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
245 my @sinfo;
246 if (!$have_slurm)
247 {
248   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
249   push @sinfo, "$localcpus localhost";
250 }
251 if (exists $ENV{SLURM_NODELIST})
252 {
253   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
254 }
255 foreach (@sinfo)
256 {
257   my ($ncpus, $slurm_nodelist) = split;
258   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
259
260   my @nodelist;
261   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
262   {
263     my $nodelist = $1;
264     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
265     {
266       my $ranges = $1;
267       foreach (split (",", $ranges))
268       {
269         my ($a, $b);
270         if (/(\d+)-(\d+)/)
271         {
272           $a = $1;
273           $b = $2;
274         }
275         else
276         {
277           $a = $_;
278           $b = $_;
279         }
280         push @nodelist, map {
281           my $n = $nodelist;
282           $n =~ s/\[[-,\d]+\]/$_/;
283           $n;
284         } ($a..$b);
285       }
286     }
287     else
288     {
289       push @nodelist, $nodelist;
290     }
291   }
292   foreach my $nodename (@nodelist)
293   {
294     Log (undef, "node $nodename - $ncpus slots");
295     my $node = { name => $nodename,
296                  ncpus => $ncpus,
297                  losing_streak => 0,
298                  hold_until => 0 };
299     foreach my $cpu (1..$ncpus)
300     {
301       push @slot, { node => $node,
302                     cpu => $cpu };
303     }
304   }
305   push @node, @nodelist;
306 }
307
308
309
310 # Ensure that we get one jobstep running on each allocated node before
311 # we start overloading nodes with concurrent steps
312
313 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
314
315
316 $Job->update_attributes(
317   'tasks_summary' => { 'failed' => 0,
318                        'todo' => 1,
319                        'running' => 0,
320                        'done' => 0 });
321
322 Log (undef, "start");
323 $SIG{'INT'} = sub { $main::please_freeze = 1; };
324 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
325 $SIG{'TERM'} = \&croak;
326 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
327 $SIG{'ALRM'} = sub { $main::please_info = 1; };
328 $SIG{'CONT'} = sub { $main::please_continue = 1; };
329 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
330
331 $main::please_freeze = 0;
332 $main::please_info = 0;
333 $main::please_continue = 0;
334 $main::please_refresh = 0;
335 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
336
337 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
338 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
339 $ENV{"JOB_UUID"} = $job_id;
340
341
342 my @jobstep_todo = ();
343 my @jobstep_done = ();
344 my @jobstep_tomerge = ();
345 my $jobstep_tomerge_level = 0;
346 my $squeue_checked = 0;
347 my $latest_refresh = scalar time;
348
349
350
351 if (defined $Job->{thawedfromkey})
352 {
353   thaw ($Job->{thawedfromkey});
354 }
355 else
356 {
357   my $first_task = api_call("job_tasks/create", job_task => {
358     'job_uuid' => $Job->{'uuid'},
359     'sequence' => 0,
360     'qsequence' => 0,
361     'parameters' => {},
362   });
363   push @jobstep, { 'level' => 0,
364                    'failures' => 0,
365                    'arvados_task' => $first_task,
366                  };
367   push @jobstep_todo, 0;
368 }
369
370
371 if (!$have_slurm)
372 {
373   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
374 }
375
376 my $build_script = handle_readall(\*DATA);
377 my $nodelist = join(",", @node);
378 my $git_tar_count = 0;
379
380 if (!defined $no_clear_tmp) {
381   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382   Log (undef, "Clean work dirs");
383
384   my $cleanpid = fork();
385   if ($cleanpid == 0)
386   {
387     # Find FUSE mounts that look like Keep mounts (the mount path has the
388     # word "keep") and unmount them.  Then clean up work directories.
389     # TODO: When #5036 is done and widely deployed, we can get rid of the
390     # regular expression and just unmount everything with type fuse.keep.
391     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
392           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
393     exit (1);
394   }
395   while (1)
396   {
397     last if $cleanpid == waitpid (-1, WNOHANG);
398     freeze_if_want_freeze ($cleanpid);
399     select (undef, undef, undef, 0.1);
400   }
401   Log (undef, "Cleanup command exited ".exit_status_s($?));
402 }
403
404 # If this job requires a Docker image, install that.
405 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
406 if ($docker_locator = $Job->{docker_image_locator}) {
407   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
408   if (!$docker_hash)
409   {
410     croak("No Docker image hash found from locator $docker_locator");
411   }
412   $docker_stream =~ s/^\.//;
413   my $docker_install_script = qq{
414 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
415     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
416 fi
417 };
418   my $docker_pid = fork();
419   if ($docker_pid == 0)
420   {
421     srun (["srun", "--nodelist=" . join(',', @node)],
422           ["/bin/sh", "-ec", $docker_install_script]);
423     exit ($?);
424   }
425   while (1)
426   {
427     last if $docker_pid == waitpid (-1, WNOHANG);
428     freeze_if_want_freeze ($docker_pid);
429     select (undef, undef, undef, 0.1);
430   }
431   if ($? != 0)
432   {
433     croak("Installing Docker image from $docker_locator exited "
434           .exit_status_s($?));
435   }
436
437   # Determine whether this version of Docker supports memory+swap limits.
438   srun(["srun", "--nodelist=" . $node[0]],
439        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
440       {fork => 1});
441   $docker_limitmem = ($? == 0);
442
443   if ($Job->{arvados_sdk_version}) {
444     # The job also specifies an Arvados SDK version.  Add the SDKs to the
445     # tar file for the build script to install.
446     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
447                        $Job->{arvados_sdk_version}));
448     add_git_archive("git", "--git-dir=$git_dir", "archive",
449                     "--prefix=.arvados.sdk/",
450                     $Job->{arvados_sdk_version}, "sdk");
451   }
452 }
453
454 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
455   # If script_version looks like an absolute path, *and* the --git-dir
456   # argument was not given -- which implies we were not invoked by
457   # crunch-dispatch -- we will use the given path as a working
458   # directory instead of resolving script_version to a git commit (or
459   # doing anything else with git).
460   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
461   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
462 }
463 else {
464   # Resolve the given script_version to a git commit sha1. Also, if
465   # the repository is remote, clone it into our local filesystem: this
466   # ensures "git archive" will work, and is necessary to reliably
467   # resolve a symbolic script_version like "master^".
468   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
469
470   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
471
472   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
473
474   # If we're running under crunch-dispatch, it will have already
475   # pulled the appropriate source tree into its own repository, and
476   # given us that repo's path as $git_dir.
477   #
478   # If we're running a "local" job, we might have to fetch content
479   # from a remote repository.
480   #
481   # (Currently crunch-dispatch gives a local path with --git-dir, but
482   # we might as well accept URLs there too in case it changes its
483   # mind.)
484   my $repo = $git_dir || $Job->{'repository'};
485
486   # Repository can be remote or local. If remote, we'll need to fetch it
487   # to a local dir before doing `git log` et al.
488   my $repo_location;
489
490   if ($repo =~ m{://|^[^/]*:}) {
491     # $repo is a git url we can clone, like git:// or https:// or
492     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
493     # not recognized here because distinguishing that from a local
494     # path is too fragile. If you really need something strange here,
495     # use the ssh:// form.
496     $repo_location = 'remote';
497   } elsif ($repo =~ m{^\.*/}) {
498     # $repo is a local path to a git index. We'll also resolve ../foo
499     # to ../foo/.git if the latter is a directory. To help
500     # disambiguate local paths from named hosted repositories, this
501     # form must be given as ./ or ../ if it's a relative path.
502     if (-d "$repo/.git") {
503       $repo = "$repo/.git";
504     }
505     $repo_location = 'local';
506   } else {
507     # $repo is none of the above. It must be the name of a hosted
508     # repository.
509     my $arv_repo_list = api_call("repositories/list",
510                                  'filters' => [['name','=',$repo]]);
511     my @repos_found = @{$arv_repo_list->{'items'}};
512     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
513     if ($n_found > 0) {
514       Log(undef, "Repository '$repo' -> "
515           . join(", ", map { $_->{'uuid'} } @repos_found));
516     }
517     if ($n_found != 1) {
518       croak("Error: Found $n_found repositories with name '$repo'.");
519     }
520     $repo = $repos_found[0]->{'fetch_url'};
521     $repo_location = 'remote';
522   }
523   Log(undef, "Using $repo_location repository '$repo'");
524   $ENV{"CRUNCH_SRC_URL"} = $repo;
525
526   # Resolve given script_version (we'll call that $treeish here) to a
527   # commit sha1 ($commit).
528   my $treeish = $Job->{'script_version'};
529   my $commit;
530   if ($repo_location eq 'remote') {
531     # We minimize excess object-fetching by re-using the same bare
532     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
533     # just keep adding remotes to it as needed.
534     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
535     my $gitcmd = "git --git-dir=\Q$local_repo\E";
536
537     # Set up our local repo for caching remote objects, making
538     # archives, etc.
539     if (!-d $local_repo) {
540       make_path($local_repo) or croak("Error: could not create $local_repo");
541     }
542     # This works (exits 0 and doesn't delete fetched objects) even
543     # if $local_repo is already initialized:
544     `$gitcmd init --bare`;
545     if ($?) {
546       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
547     }
548
549     # If $treeish looks like a hash (or abbrev hash) we look it up in
550     # our local cache first, since that's cheaper. (We don't want to
551     # do that with tags/branches though -- those change over time, so
552     # they should always be resolved by the remote repo.)
553     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
554       # Hide stderr because it's normal for this to fail:
555       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
556       if ($? == 0 &&
557           # Careful not to resolve a branch named abcdeff to commit 1234567:
558           $sha1 =~ /^$treeish/ &&
559           $sha1 =~ /^([0-9a-f]{40})$/s) {
560         $commit = $1;
561         Log(undef, "Commit $commit already present in $local_repo");
562       }
563     }
564
565     if (!defined $commit) {
566       # If $treeish isn't just a hash or abbrev hash, or isn't here
567       # yet, we need to fetch the remote to resolve it correctly.
568
569       # First, remove all local heads. This prevents a name that does
570       # not exist on the remote from resolving to (or colliding with)
571       # a previously fetched branch or tag (possibly from a different
572       # remote).
573       remove_tree("$local_repo/refs/heads", {keep_root => 1});
574
575       Log(undef, "Fetching objects from $repo to $local_repo");
576       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
577       if ($?) {
578         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
579       }
580     }
581
582     # Now that the data is all here, we will use our local repo for
583     # the rest of our git activities.
584     $repo = $local_repo;
585   }
586
587   my $gitcmd = "git --git-dir=\Q$repo\E";
588   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
589   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
590     croak("`$gitcmd rev-list` exited "
591           .exit_status_s($?)
592           .", '$treeish' not found. Giving up.");
593   }
594   $commit = $1;
595   Log(undef, "Version $treeish is commit $commit");
596
597   if ($commit ne $Job->{'script_version'}) {
598     # Record the real commit id in the database, frozentokey, logs,
599     # etc. -- instead of an abbreviation or a branch name which can
600     # become ambiguous or point to a different commit in the future.
601     if (!$Job->update_attributes('script_version' => $commit)) {
602       croak("Error: failed to update job's script_version attribute");
603     }
604   }
605
606   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
607   add_git_archive("$gitcmd archive ''\Q$commit\E");
608 }
609
610 my $git_archive = combined_git_archive();
611 if (!defined $git_archive) {
612   Log(undef, "Skip install phase (no git archive)");
613   if ($have_slurm) {
614     Log(undef, "Warning: This probably means workers have no source tree!");
615   }
616 }
617 else {
618   my $install_exited;
619   my $install_script_tries_left = 3;
620   for (my $attempts = 0; $attempts < 3; $attempts++) {
621     Log(undef, "Run install script on all workers");
622
623     my @srunargs = ("srun",
624                     "--nodelist=$nodelist",
625                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
626     my @execargs = ("sh", "-c",
627                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
628
629     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
630     my ($install_stderr_r, $install_stderr_w);
631     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
632     set_nonblocking($install_stderr_r);
633     my $installpid = fork();
634     if ($installpid == 0)
635     {
636       close($install_stderr_r);
637       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
638       open(STDOUT, ">&", $install_stderr_w);
639       open(STDERR, ">&", $install_stderr_w);
640       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
641       exit (1);
642     }
643     close($install_stderr_w);
644     # Tell freeze_if_want_freeze how to kill the child, otherwise the
645     # "waitpid(installpid)" loop won't get interrupted by a freeze:
646     $proc{$installpid} = {};
647     my $stderr_buf = '';
648     # Track whether anything appears on stderr other than slurm errors
649     # ("srun: ...") and the "starting: ..." message printed by the
650     # srun subroutine itself:
651     my $stderr_anything_from_script = 0;
652     my $match_our_own_errors = '^(srun: error: |starting: \[)';
653     while ($installpid != waitpid(-1, WNOHANG)) {
654       freeze_if_want_freeze ($installpid);
655       # Wait up to 0.1 seconds for something to appear on stderr, then
656       # do a non-blocking read.
657       my $bits = fhbits($install_stderr_r);
658       select ($bits, undef, $bits, 0.1);
659       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
660       {
661         while ($stderr_buf =~ /^(.*?)\n/) {
662           my $line = $1;
663           substr $stderr_buf, 0, 1+length($line), "";
664           Log(undef, "stderr $line");
665           if ($line !~ /$match_our_own_errors/) {
666             $stderr_anything_from_script = 1;
667           }
668         }
669       }
670     }
671     delete $proc{$installpid};
672     $install_exited = $?;
673     close($install_stderr_r);
674     if (length($stderr_buf) > 0) {
675       if ($stderr_buf !~ /$match_our_own_errors/) {
676         $stderr_anything_from_script = 1;
677       }
678       Log(undef, "stderr $stderr_buf")
679     }
680
681     Log (undef, "Install script exited ".exit_status_s($install_exited));
682     last if $install_exited == 0 || $main::please_freeze;
683     # If the install script fails but doesn't print an error message,
684     # the next thing anyone is likely to do is just run it again in
685     # case it was a transient problem like "slurm communication fails
686     # because the network isn't reliable enough". So we'll just do
687     # that ourselves (up to 3 attempts in total). OTOH, if there is an
688     # error message, the problem is more likely to have a real fix and
689     # we should fail the job so the fixing process can start, instead
690     # of doing 2 more attempts.
691     last if $stderr_anything_from_script;
692   }
693
694   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
695     unlink($tar_filename);
696   }
697
698   if ($install_exited != 0) {
699     croak("Giving up");
700   }
701 }
702
703 foreach (qw (script script_version script_parameters runtime_constraints))
704 {
705   Log (undef,
706        "$_ " .
707        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
708 }
709 foreach (split (/\n/, $Job->{knobs}))
710 {
711   Log (undef, "knob " . $_);
712 }
713
714
715
716 $main::success = undef;
717
718
719
720 ONELEVEL:
721
722 my $thisround_succeeded = 0;
723 my $thisround_failed = 0;
724 my $thisround_failed_multiple = 0;
725 my $working_slot_count = scalar(@slot);
726
727 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
728                        or $a <=> $b } @jobstep_todo;
729 my $level = $jobstep[$jobstep_todo[0]]->{level};
730
731 my $initial_tasks_this_level = 0;
732 foreach my $id (@jobstep_todo) {
733   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
734 }
735
736 # If the number of tasks scheduled at this level #T is smaller than the number
737 # of slots available #S, only use the first #T slots, or the first slot on
738 # each node, whichever number is greater.
739 #
740 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
741 # based on these numbers.  Using fewer slots makes more resources available
742 # to each individual task, which should normally be a better strategy when
743 # there are fewer of them running with less parallelism.
744 #
745 # Note that this calculation is not redone if the initial tasks at
746 # this level queue more tasks at the same level.  This may harm
747 # overall task throughput for that level.
748 my @freeslot;
749 if ($initial_tasks_this_level < @node) {
750   @freeslot = (0..$#node);
751 } elsif ($initial_tasks_this_level < @slot) {
752   @freeslot = (0..$initial_tasks_this_level - 1);
753 } else {
754   @freeslot = (0..$#slot);
755 }
756 my $round_num_freeslots = scalar(@freeslot);
757
758 my %round_max_slots = ();
759 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
760   my $this_slot = $slot[$freeslot[$ii]];
761   my $node_name = $this_slot->{node}->{name};
762   $round_max_slots{$node_name} ||= $this_slot->{cpu};
763   last if (scalar(keys(%round_max_slots)) >= @node);
764 }
765
766 Log(undef, "start level $level with $round_num_freeslots slots");
767 my @holdslot;
768 my %reader;
769 my $progress_is_dirty = 1;
770 my $progress_stats_updated = 0;
771
772 update_progress_stats();
773
774
775 THISROUND:
776 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
777 {
778   my $id = $jobstep_todo[$todo_ptr];
779   my $Jobstep = $jobstep[$id];
780   if ($Jobstep->{level} != $level)
781   {
782     next;
783   }
784
785   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
786   set_nonblocking($reader{$id});
787
788   my $childslot = $freeslot[0];
789   my $childnode = $slot[$childslot]->{node};
790   my $childslotname = join (".",
791                             $slot[$childslot]->{node}->{name},
792                             $slot[$childslot]->{cpu});
793
794   my $childpid = fork();
795   if ($childpid == 0)
796   {
797     $SIG{'INT'} = 'DEFAULT';
798     $SIG{'QUIT'} = 'DEFAULT';
799     $SIG{'TERM'} = 'DEFAULT';
800
801     foreach (values (%reader))
802     {
803       close($_);
804     }
805     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
806     open(STDOUT,">&writer");
807     open(STDERR,">&writer");
808
809     undef $dbh;
810     undef $sth;
811
812     delete $ENV{"GNUPGHOME"};
813     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
814     $ENV{"TASK_QSEQUENCE"} = $id;
815     $ENV{"TASK_SEQUENCE"} = $level;
816     $ENV{"JOB_SCRIPT"} = $Job->{script};
817     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
818       $param =~ tr/a-z/A-Z/;
819       $ENV{"JOB_PARAMETER_$param"} = $value;
820     }
821     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
822     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
823     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
824     $ENV{"HOME"} = $ENV{"TASK_WORK"};
825     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
826     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
827     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
828     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
829
830     $ENV{"GZIP"} = "-n";
831
832     my @srunargs = (
833       "srun",
834       "--nodelist=".$childnode->{name},
835       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
836       "--job-name=$job_id.$id.$$",
837         );
838     my $command =
839         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
840         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
841         ."&& cd $ENV{CRUNCH_TMP} "
842         # These environment variables get used explicitly later in
843         # $command.  No tool is expected to read these values directly.
844         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
845         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
846         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
847         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
848     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
849     if ($docker_hash)
850     {
851       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
852       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
853       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
854       # We only set memory limits if Docker lets us limit both memory and swap.
855       # Memory limits alone have been supported longer, but subprocesses tend
856       # to get SIGKILL if they exceed that without any swap limit set.
857       # See #5642 for additional background.
858       if ($docker_limitmem) {
859         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
860       }
861
862       # Dynamically configure the container to use the host system as its
863       # DNS server.  Get the host's global addresses from the ip command,
864       # and turn them into docker --dns options using gawk.
865       $command .=
866           q{$(ip -o address show scope global |
867               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
868
869       # The source tree and $destdir directory (which we have
870       # installed on the worker host) are available in the container,
871       # under the same path.
872       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
873       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
874
875       # Currently, we make arv-mount's mount point appear at /keep
876       # inside the container (instead of using the same path as the
877       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
878       # crunch scripts and utilities must not rely on this. They must
879       # use $TASK_KEEPMOUNT.
880       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
881       $ENV{TASK_KEEPMOUNT} = "/keep";
882
883       # TASK_WORK is almost exactly like a docker data volume: it
884       # starts out empty, is writable, and persists until no
885       # containers use it any more. We don't use --volumes-from to
886       # share it with other containers: it is only accessible to this
887       # task, and it goes away when this task stops.
888       #
889       # However, a docker data volume is writable only by root unless
890       # the mount point already happens to exist in the container with
891       # different permissions. Therefore, we [1] assume /tmp already
892       # exists in the image and is writable by the crunch user; [2]
893       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
894       # writable if they are created by docker while setting up the
895       # other --volumes); and [3] create $TASK_WORK inside the
896       # container using $build_script.
897       $command .= "--volume=/tmp ";
898       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
899       $ENV{"HOME"} = $ENV{"TASK_WORK"};
900       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
901
902       # TODO: Share a single JOB_WORK volume across all task
903       # containers on a given worker node, and delete it when the job
904       # ends (and, in case that doesn't work, when the next job
905       # starts).
906       #
907       # For now, use the same approach as TASK_WORK above.
908       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
909
910       while (my ($env_key, $env_val) = each %ENV)
911       {
912         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
913           $command .= "--env=\Q$env_key=$env_val\E ";
914         }
915       }
916       $command .= "--env=\QHOME=$ENV{HOME}\E ";
917       $command .= "\Q$docker_hash\E ";
918       $command .= "stdbuf --output=0 --error=0 ";
919       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
920     } else {
921       # Non-docker run
922       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
923       $command .= "stdbuf --output=0 --error=0 ";
924       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
925     }
926
927     my @execargs = ('bash', '-c', $command);
928     srun (\@srunargs, \@execargs, undef, $build_script);
929     # exec() failed, we assume nothing happened.
930     die "srun() failed on build script\n";
931   }
932   close("writer");
933   if (!defined $childpid)
934   {
935     close $reader{$id};
936     delete $reader{$id};
937     next;
938   }
939   shift @freeslot;
940   $proc{$childpid} = { jobstep => $id,
941                        time => time,
942                        slot => $childslot,
943                        jobstepname => "$job_id.$id.$childpid",
944                      };
945   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
946   $slot[$childslot]->{pid} = $childpid;
947
948   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
949   Log ($id, "child $childpid started on $childslotname");
950   $Jobstep->{starttime} = time;
951   $Jobstep->{node} = $childnode->{name};
952   $Jobstep->{slotindex} = $childslot;
953   delete $Jobstep->{stderr};
954   delete $Jobstep->{finishtime};
955
956   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
957   $Jobstep->{'arvados_task'}->save;
958
959   splice @jobstep_todo, $todo_ptr, 1;
960   --$todo_ptr;
961
962   $progress_is_dirty = 1;
963
964   while (!@freeslot
965          ||
966          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
967   {
968     last THISROUND if $main::please_freeze || defined($main::success);
969     if ($main::please_info)
970     {
971       $main::please_info = 0;
972       freeze();
973       create_output_collection();
974       save_meta(1);
975       update_progress_stats();
976     }
977     my $gotsome
978         = readfrompipes ()
979         + reapchildren ();
980     if (!$gotsome)
981     {
982       check_refresh_wanted();
983       check_squeue();
984       update_progress_stats();
985       select (undef, undef, undef, 0.1);
986     }
987     elsif (time - $progress_stats_updated >= 30)
988     {
989       update_progress_stats();
990     }
991     $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 &&
992                                         $_->{node}->{hold_count} < 4 } @slot);
993     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
994         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
995     {
996       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
997           .($thisround_failed+$thisround_succeeded)
998           .") -- giving up on this round";
999       Log (undef, $message);
1000       last THISROUND;
1001     }
1002
1003     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1004     for (my $i=$#freeslot; $i>=0; $i--) {
1005       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1006         push @holdslot, (splice @freeslot, $i, 1);
1007       }
1008     }
1009     for (my $i=$#holdslot; $i>=0; $i--) {
1010       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1011         push @freeslot, (splice @holdslot, $i, 1);
1012       }
1013     }
1014
1015     # give up if no nodes are succeeding
1016     if ($working_slot_count < 1) {
1017       Log(undef, "Every node has failed -- giving up");
1018       last THISROUND;
1019     }
1020   }
1021 }
1022
1023
1024 push @freeslot, splice @holdslot;
1025 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1026
1027
1028 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1029 while (%proc)
1030 {
1031   if ($main::please_continue) {
1032     $main::please_continue = 0;
1033     goto THISROUND;
1034   }
1035   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1036   readfrompipes ();
1037   if (!reapchildren())
1038   {
1039     check_refresh_wanted();
1040     check_squeue();
1041     update_progress_stats();
1042     select (undef, undef, undef, 0.1);
1043     killem (keys %proc) if $main::please_freeze;
1044   }
1045 }
1046
1047 update_progress_stats();
1048 freeze_if_want_freeze();
1049
1050
1051 if (!defined $main::success)
1052 {
1053   if (!@jobstep_todo) {
1054     $main::success = 1;
1055   } elsif ($working_slot_count < 1) {
1056     save_output_collection();
1057     save_meta();
1058     exit(EX_RETRY_UNLOCKED);
1059   } elsif ($thisround_succeeded == 0 &&
1060            ($thisround_failed == 0 || $thisround_failed > 4)) {
1061     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1062     Log (undef, $message);
1063     $main::success = 0;
1064   }
1065 }
1066
1067 goto ONELEVEL if !defined $main::success;
1068
1069
1070 release_allocation();
1071 freeze();
1072 my $collated_output = save_output_collection();
1073 Log (undef, "finish");
1074
1075 save_meta();
1076
1077 my $final_state;
1078 if ($collated_output && $main::success) {
1079   $final_state = 'Complete';
1080 } else {
1081   $final_state = 'Failed';
1082 }
1083 $Job->update_attributes('state' => $final_state);
1084
1085 exit (($final_state eq 'Complete') ? 0 : 1);
1086
1087
1088
1089 sub update_progress_stats
1090 {
1091   $progress_stats_updated = time;
1092   return if !$progress_is_dirty;
1093   my ($todo, $done, $running) = (scalar @jobstep_todo,
1094                                  scalar @jobstep_done,
1095                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1096   $Job->{'tasks_summary'} ||= {};
1097   $Job->{'tasks_summary'}->{'todo'} = $todo;
1098   $Job->{'tasks_summary'}->{'done'} = $done;
1099   $Job->{'tasks_summary'}->{'running'} = $running;
1100   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1101   Log (undef, "status: $done done, $running running, $todo todo");
1102   $progress_is_dirty = 0;
1103 }
1104
1105
1106
1107 sub reapchildren
1108 {
1109   my $pid = waitpid (-1, WNOHANG);
1110   return 0 if $pid <= 0;
1111
1112   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1113                   . "."
1114                   . $slot[$proc{$pid}->{slot}]->{cpu});
1115   my $jobstepid = $proc{$pid}->{jobstep};
1116   my $elapsed = time - $proc{$pid}->{time};
1117   my $Jobstep = $jobstep[$jobstepid];
1118
1119   my $childstatus = $?;
1120   my $exitvalue = $childstatus >> 8;
1121   my $exitinfo = "exit ".exit_status_s($childstatus);
1122   $Jobstep->{'arvados_task'}->reload;
1123   my $task_success = $Jobstep->{'arvados_task'}->{success};
1124
1125   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1126
1127   if (!defined $task_success) {
1128     # task did not indicate one way or the other --> fail
1129     $Jobstep->{'arvados_task'}->{success} = 0;
1130     $Jobstep->{'arvados_task'}->save;
1131     $task_success = 0;
1132   }
1133
1134   if (!$task_success)
1135   {
1136     my $temporary_fail;
1137     $temporary_fail ||= $Jobstep->{node_fail};
1138     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1139
1140     ++$thisround_failed;
1141     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1142
1143     # Check for signs of a failed or misconfigured node
1144     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1145         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1146       # Don't count this against jobstep failure thresholds if this
1147       # node is already suspected faulty and srun exited quickly
1148       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1149           $elapsed < 5) {
1150         Log ($jobstepid, "blaming failure on suspect node " .
1151              $slot[$proc{$pid}->{slot}]->{node}->{name});
1152         $temporary_fail ||= 1;
1153       }
1154       ban_node_by_slot($proc{$pid}->{slot});
1155     }
1156
1157     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1158                              ++$Jobstep->{'failures'},
1159                              $temporary_fail ? 'temporary' : 'permanent',
1160                              $elapsed));
1161
1162     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1163       # Give up on this task, and the whole job
1164       $main::success = 0;
1165     }
1166     # Put this task back on the todo queue
1167     push @jobstep_todo, $jobstepid;
1168     $Job->{'tasks_summary'}->{'failed'}++;
1169   }
1170   else
1171   {
1172     ++$thisround_succeeded;
1173     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1174     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1175     push @jobstep_done, $jobstepid;
1176     Log ($jobstepid, "success in $elapsed seconds");
1177   }
1178   $Jobstep->{exitcode} = $childstatus;
1179   $Jobstep->{finishtime} = time;
1180   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1181   $Jobstep->{'arvados_task'}->save;
1182   process_stderr ($jobstepid, $task_success);
1183   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1184                            length($Jobstep->{'arvados_task'}->{output}),
1185                            $Jobstep->{'arvados_task'}->{output}));
1186
1187   close $reader{$jobstepid};
1188   delete $reader{$jobstepid};
1189   delete $slot[$proc{$pid}->{slot}]->{pid};
1190   push @freeslot, $proc{$pid}->{slot};
1191   delete $proc{$pid};
1192
1193   if ($task_success) {
1194     # Load new tasks
1195     my $newtask_list = [];
1196     my $newtask_results;
1197     do {
1198       $newtask_results = api_call(
1199         "job_tasks/list",
1200         'where' => {
1201           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1202         },
1203         'order' => 'qsequence',
1204         'offset' => scalar(@$newtask_list),
1205       );
1206       push(@$newtask_list, @{$newtask_results->{items}});
1207     } while (@{$newtask_results->{items}});
1208     foreach my $arvados_task (@$newtask_list) {
1209       my $jobstep = {
1210         'level' => $arvados_task->{'sequence'},
1211         'failures' => 0,
1212         'arvados_task' => $arvados_task
1213       };
1214       push @jobstep, $jobstep;
1215       push @jobstep_todo, $#jobstep;
1216     }
1217   }
1218
1219   $progress_is_dirty = 1;
1220   1;
1221 }
1222
1223 sub check_refresh_wanted
1224 {
1225   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1226   if (@stat && $stat[9] > $latest_refresh) {
1227     $latest_refresh = scalar time;
1228     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1229     for my $attr ('cancelled_at',
1230                   'cancelled_by_user_uuid',
1231                   'cancelled_by_client_uuid',
1232                   'state') {
1233       $Job->{$attr} = $Job2->{$attr};
1234     }
1235     if ($Job->{'state'} ne "Running") {
1236       if ($Job->{'state'} eq "Cancelled") {
1237         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1238       } else {
1239         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1240       }
1241       $main::success = 0;
1242       $main::please_freeze = 1;
1243     }
1244   }
1245 }
1246
1247 sub check_squeue
1248 {
1249   my $last_squeue_check = $squeue_checked;
1250
1251   # Do not call `squeue` or check the kill list more than once every
1252   # 15 seconds.
1253   return if $last_squeue_check > time - 15;
1254   $squeue_checked = time;
1255
1256   # Look for children from which we haven't received stderr data since
1257   # the last squeue check. If no such children exist, all procs are
1258   # alive and there's no need to even look at squeue.
1259   #
1260   # As long as the crunchstat poll interval (10s) is shorter than the
1261   # squeue check interval (15s) this should make the squeue check an
1262   # infrequent event.
1263   my $silent_procs = 0;
1264   for my $jobstep (values %proc)
1265   {
1266     if ($jobstep->{stderr_at} < $last_squeue_check)
1267     {
1268       $silent_procs++;
1269     }
1270   }
1271   return if $silent_procs == 0;
1272
1273   # use killem() on procs whose killtime is reached
1274   while (my ($pid, $jobstep) = each %proc)
1275   {
1276     if (exists $jobstep->{killtime}
1277         && $jobstep->{killtime} <= time
1278         && $jobstep->{stderr_at} < $last_squeue_check)
1279     {
1280       my $sincewhen = "";
1281       if ($jobstep->{stderr_at}) {
1282         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1283       }
1284       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1285       killem ($pid);
1286     }
1287   }
1288
1289   if (!$have_slurm)
1290   {
1291     # here is an opportunity to check for mysterious problems with local procs
1292     return;
1293   }
1294
1295   # Get a list of steps still running.  Note: squeue(1) says --steps
1296   # selects a format (which we override anyway) and allows us to
1297   # specify which steps we're interested in (which we don't).
1298   # Importantly, it also changes the meaning of %j from "job name" to
1299   # "step name" and (although this isn't mentioned explicitly in the
1300   # docs) switches from "one line per job" mode to "one line per step"
1301   # mode. Without it, we'd just get a list of one job, instead of a
1302   # list of N steps.
1303   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1304   if ($? != 0)
1305   {
1306     Log(undef, "warning: squeue exit status $? ($!)");
1307     return;
1308   }
1309   chop @squeue;
1310
1311   # which of my jobsteps are running, according to squeue?
1312   my %ok;
1313   for my $jobstepname (@squeue)
1314   {
1315     $ok{$jobstepname} = 1;
1316   }
1317
1318   # Check for child procs >60s old and not mentioned by squeue.
1319   while (my ($pid, $jobstep) = each %proc)
1320   {
1321     if ($jobstep->{time} < time - 60
1322         && $jobstep->{jobstepname}
1323         && !exists $ok{$jobstep->{jobstepname}}
1324         && !exists $jobstep->{killtime})
1325     {
1326       # According to slurm, this task has ended (successfully or not)
1327       # -- but our srun child hasn't exited. First we must wait (30
1328       # seconds) in case this is just a race between communication
1329       # channels. Then, if our srun child process still hasn't
1330       # terminated, we'll conclude some slurm communication
1331       # error/delay has caused the task to die without notifying srun,
1332       # and we'll kill srun ourselves.
1333       $jobstep->{killtime} = time + 30;
1334       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1335     }
1336   }
1337 }
1338
1339
1340 sub release_allocation
1341 {
1342   if ($have_slurm)
1343   {
1344     Log (undef, "release job allocation");
1345     system "scancel $ENV{SLURM_JOB_ID}";
1346   }
1347 }
1348
1349
1350 sub readfrompipes
1351 {
1352   my $gotsome = 0;
1353   foreach my $job (keys %reader)
1354   {
1355     my $buf;
1356     while (0 < sysread ($reader{$job}, $buf, 8192))
1357     {
1358       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1359       $jobstep[$job]->{stderr_at} = time;
1360       $jobstep[$job]->{stderr} .= $buf;
1361       preprocess_stderr ($job);
1362       if (length ($jobstep[$job]->{stderr}) > 16384)
1363       {
1364         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1365       }
1366       $gotsome = 1;
1367     }
1368   }
1369   return $gotsome;
1370 }
1371
1372
1373 sub preprocess_stderr
1374 {
1375   my $job = shift;
1376
1377   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1378     my $line = $1;
1379     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1380     Log ($job, "stderr $line");
1381     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1382       # whoa.
1383       $main::please_freeze = 1;
1384     }
1385     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1386       $jobstep[$job]->{node_fail} = 1;
1387       ban_node_by_slot($jobstep[$job]->{slotindex});
1388     }
1389   }
1390 }
1391
1392
1393 sub process_stderr
1394 {
1395   my $job = shift;
1396   my $task_success = shift;
1397   preprocess_stderr ($job);
1398
1399   map {
1400     Log ($job, "stderr $_");
1401   } split ("\n", $jobstep[$job]->{stderr});
1402 }
1403
1404 sub fetch_block
1405 {
1406   my $hash = shift;
1407   my $keep;
1408   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1409     Log(undef, "fetch_block run error from arv-get $hash: $!");
1410     return undef;
1411   }
1412   my $output_block = "";
1413   while (1) {
1414     my $buf;
1415     my $bytes = sysread($keep, $buf, 1024 * 1024);
1416     if (!defined $bytes) {
1417       Log(undef, "fetch_block read error from arv-get: $!");
1418       $output_block = undef;
1419       last;
1420     } elsif ($bytes == 0) {
1421       # sysread returns 0 at the end of the pipe.
1422       last;
1423     } else {
1424       # some bytes were read into buf.
1425       $output_block .= $buf;
1426     }
1427   }
1428   close $keep;
1429   if ($?) {
1430     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1431     $output_block = undef;
1432   }
1433   return $output_block;
1434 }
1435
1436 # Create a collection by concatenating the output of all tasks (each
1437 # task's output is either a manifest fragment, a locator for a
1438 # manifest fragment stored in Keep, or nothing at all). Return the
1439 # portable_data_hash of the new collection.
1440 sub create_output_collection
1441 {
1442   Log (undef, "collate");
1443
1444   my ($child_out, $child_in);
1445   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1446 import arvados
1447 import sys
1448 print (arvados.api("v1").collections().
1449        create(body={"manifest_text": sys.stdin.read()}).
1450        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1451 }, retry_count());
1452
1453   my $task_idx = -1;
1454   my $manifest_size = 0;
1455   for (@jobstep)
1456   {
1457     ++$task_idx;
1458     my $output = $_->{'arvados_task'}->{output};
1459     next if (!defined($output));
1460     my $next_write;
1461     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1462       $next_write = fetch_block($output);
1463     } else {
1464       $next_write = $output;
1465     }
1466     if (defined($next_write)) {
1467       if (!defined(syswrite($child_in, $next_write))) {
1468         # There's been an error writing.  Stop the loop.
1469         # We'll log details about the exit code later.
1470         last;
1471       } else {
1472         $manifest_size += length($next_write);
1473       }
1474     } else {
1475       my $uuid = $_->{'arvados_task'}->{'uuid'};
1476       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1477       $main::success = 0;
1478     }
1479   }
1480   close($child_in);
1481   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1482
1483   my $joboutput;
1484   my $s = IO::Select->new($child_out);
1485   if ($s->can_read(120)) {
1486     sysread($child_out, $joboutput, 1024 * 1024);
1487     waitpid($pid, 0);
1488     if ($?) {
1489       Log(undef, "output collection creation exited " . exit_status_s($?));
1490       $joboutput = undef;
1491     } else {
1492       chomp($joboutput);
1493     }
1494   } else {
1495     Log (undef, "timed out while creating output collection");
1496     foreach my $signal (2, 2, 2, 15, 15, 9) {
1497       kill($signal, $pid);
1498       last if waitpid($pid, WNOHANG) == -1;
1499       sleep(1);
1500     }
1501   }
1502   close($child_out);
1503
1504   return $joboutput;
1505 }
1506
1507 # Calls create_output_collection, logs the result, and returns it.
1508 # If that was successful, save that as the output in the job record.
1509 sub save_output_collection {
1510   my $collated_output = create_output_collection();
1511
1512   if (!$collated_output) {
1513     Log(undef, "Failed to write output collection");
1514   }
1515   else {
1516     Log(undef, "job output $collated_output");
1517     $Job->update_attributes('output' => $collated_output);
1518   }
1519   return $collated_output;
1520 }
1521
1522 sub killem
1523 {
1524   foreach (@_)
1525   {
1526     my $sig = 2;                # SIGINT first
1527     if (exists $proc{$_}->{"sent_$sig"} &&
1528         time - $proc{$_}->{"sent_$sig"} > 4)
1529     {
1530       $sig = 15;                # SIGTERM if SIGINT doesn't work
1531     }
1532     if (exists $proc{$_}->{"sent_$sig"} &&
1533         time - $proc{$_}->{"sent_$sig"} > 4)
1534     {
1535       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1536     }
1537     if (!exists $proc{$_}->{"sent_$sig"})
1538     {
1539       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1540       kill $sig, $_;
1541       select (undef, undef, undef, 0.1);
1542       if ($sig == 2)
1543       {
1544         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1545       }
1546       $proc{$_}->{"sent_$sig"} = time;
1547       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1548     }
1549   }
1550 }
1551
1552
1553 sub fhbits
1554 {
1555   my($bits);
1556   for (@_) {
1557     vec($bits,fileno($_),1) = 1;
1558   }
1559   $bits;
1560 }
1561
1562
1563 # Send log output to Keep via arv-put.
1564 #
1565 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1566 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1567 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1568 # $log_pipe_pid is the pid of the arv-put subprocess.
1569 #
1570 # The only functions that should access these variables directly are:
1571 #
1572 # log_writer_start($logfilename)
1573 #     Starts an arv-put pipe, reading data on stdin and writing it to
1574 #     a $logfilename file in an output collection.
1575 #
1576 # log_writer_read_output([$timeout])
1577 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1578 #     Passes $timeout to the select() call, with a default of 0.01.
1579 #     Returns the result of the last read() call on $log_pipe_out, or
1580 #     -1 if read() wasn't called because select() timed out.
1581 #     Only other log_writer_* functions should need to call this.
1582 #
1583 # log_writer_send($txt)
1584 #     Writes $txt to the output log collection.
1585 #
1586 # log_writer_finish()
1587 #     Closes the arv-put pipe and returns the output that it produces.
1588 #
1589 # log_writer_is_active()
1590 #     Returns a true value if there is currently a live arv-put
1591 #     process, false otherwise.
1592 #
1593 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1594     $log_pipe_pid);
1595
1596 sub log_writer_start($)
1597 {
1598   my $logfilename = shift;
1599   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1600                         'arv-put',
1601                         '--stream',
1602                         '--retries', '3',
1603                         '--filename', $logfilename,
1604                         '-');
1605   $log_pipe_out_buf = "";
1606   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1607 }
1608
1609 sub log_writer_read_output {
1610   my $timeout = shift || 0.01;
1611   my $read = -1;
1612   while ($read && $log_pipe_out_select->can_read($timeout)) {
1613     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1614                  length($log_pipe_out_buf));
1615   }
1616   if (!defined($read)) {
1617     Log(undef, "error reading log manifest from arv-put: $!");
1618   }
1619   return $read;
1620 }
1621
1622 sub log_writer_send($)
1623 {
1624   my $txt = shift;
1625   print $log_pipe_in $txt;
1626   log_writer_read_output();
1627 }
1628
1629 sub log_writer_finish()
1630 {
1631   return unless $log_pipe_pid;
1632
1633   close($log_pipe_in);
1634
1635   my $read_result = log_writer_read_output(120);
1636   if ($read_result == -1) {
1637     Log (undef, "timed out reading from 'arv-put'");
1638   } elsif ($read_result != 0) {
1639     Log(undef, "failed to read arv-put log manifest to EOF");
1640   }
1641
1642   waitpid($log_pipe_pid, 0);
1643   if ($?) {
1644     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1645   }
1646
1647   close($log_pipe_out);
1648   my $arv_put_output = $log_pipe_out_buf;
1649   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1650       $log_pipe_out_select = undef;
1651
1652   return $arv_put_output;
1653 }
1654
1655 sub log_writer_is_active() {
1656   return $log_pipe_pid;
1657 }
1658
1659 sub Log                         # ($jobstep_id, $logmessage)
1660 {
1661   if ($_[1] =~ /\n/) {
1662     for my $line (split (/\n/, $_[1])) {
1663       Log ($_[0], $line);
1664     }
1665     return;
1666   }
1667   my $fh = select STDERR; $|=1; select $fh;
1668   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1669   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1670   $message .= "\n";
1671   my $datetime;
1672   if (log_writer_is_active() || -t STDERR) {
1673     my @gmtime = gmtime;
1674     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1675                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1676   }
1677   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1678
1679   if (log_writer_is_active()) {
1680     log_writer_send($datetime . " " . $message);
1681   }
1682 }
1683
1684
1685 sub croak
1686 {
1687   my ($package, $file, $line) = caller;
1688   my $message = "@_ at $file line $line\n";
1689   Log (undef, $message);
1690   freeze() if @jobstep_todo;
1691   create_output_collection() if @jobstep_todo;
1692   cleanup();
1693   save_meta();
1694   die;
1695 }
1696
1697
1698 sub cleanup
1699 {
1700   return unless $Job;
1701   if ($Job->{'state'} eq 'Cancelled') {
1702     $Job->update_attributes('finished_at' => scalar gmtime);
1703   } else {
1704     $Job->update_attributes('state' => 'Failed');
1705   }
1706 }
1707
1708
1709 sub save_meta
1710 {
1711   my $justcheckpoint = shift; # false if this will be the last meta saved
1712   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1713   return unless log_writer_is_active();
1714
1715   my $log_manifest = "";
1716   if ($Job->{log}) {
1717     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1718     $log_manifest .= $prev_log_coll->{manifest_text};
1719   }
1720   $log_manifest .= log_writer_finish();
1721
1722   my $log_coll = api_call(
1723     "collections/create", ensure_unique_name => 1, collection => {
1724       manifest_text => $log_manifest,
1725       owner_uuid => $Job->{owner_uuid},
1726       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1727     });
1728   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1729   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1730 }
1731
1732
1733 sub freeze_if_want_freeze
1734 {
1735   if ($main::please_freeze)
1736   {
1737     release_allocation();
1738     if (@_)
1739     {
1740       # kill some srun procs before freeze+stop
1741       map { $proc{$_} = {} } @_;
1742       while (%proc)
1743       {
1744         killem (keys %proc);
1745         select (undef, undef, undef, 0.1);
1746         my $died;
1747         while (($died = waitpid (-1, WNOHANG)) > 0)
1748         {
1749           delete $proc{$died};
1750         }
1751       }
1752     }
1753     freeze();
1754     create_output_collection();
1755     cleanup();
1756     save_meta();
1757     exit 1;
1758   }
1759 }
1760
1761
1762 sub freeze
1763 {
1764   Log (undef, "Freeze not implemented");
1765   return;
1766 }
1767
1768
1769 sub thaw
1770 {
1771   croak ("Thaw not implemented");
1772 }
1773
1774
1775 sub freezequote
1776 {
1777   my $s = shift;
1778   $s =~ s/\\/\\\\/g;
1779   $s =~ s/\n/\\n/g;
1780   return $s;
1781 }
1782
1783
1784 sub freezeunquote
1785 {
1786   my $s = shift;
1787   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1788   return $s;
1789 }
1790
1791
1792 sub srun
1793 {
1794   my $srunargs = shift;
1795   my $execargs = shift;
1796   my $opts = shift || {};
1797   my $stdin = shift;
1798   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1799
1800   $Data::Dumper::Terse = 1;
1801   $Data::Dumper::Indent = 0;
1802   my $show_cmd = Dumper($args);
1803   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1804   $show_cmd =~ s/\n/ /g;
1805   if ($opts->{fork}) {
1806     Log(undef, "starting: $show_cmd");
1807   } else {
1808     # This is a child process: parent is in charge of reading our
1809     # stderr and copying it to Log() if needed.
1810     warn "starting: $show_cmd\n";
1811   }
1812
1813   if (defined $stdin) {
1814     my $child = open STDIN, "-|";
1815     defined $child or die "no fork: $!";
1816     if ($child == 0) {
1817       print $stdin or die $!;
1818       close STDOUT or die $!;
1819       exit 0;
1820     }
1821   }
1822
1823   return system (@$args) if $opts->{fork};
1824
1825   exec @$args;
1826   warn "ENV size is ".length(join(" ",%ENV));
1827   die "exec failed: $!: @$args";
1828 }
1829
1830
1831 sub ban_node_by_slot {
1832   # Don't start any new jobsteps on this node for 60 seconds
1833   my $slotid = shift;
1834   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1835   $slot[$slotid]->{node}->{hold_count}++;
1836   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1837 }
1838
1839 sub must_lock_now
1840 {
1841   my ($lockfile, $error_message) = @_;
1842   open L, ">", $lockfile or croak("$lockfile: $!");
1843   if (!flock L, LOCK_EX|LOCK_NB) {
1844     croak("Can't lock $lockfile: $error_message\n");
1845   }
1846 }
1847
1848 sub find_docker_image {
1849   # Given a Keep locator, check to see if it contains a Docker image.
1850   # If so, return its stream name and Docker hash.
1851   # If not, return undef for both values.
1852   my $locator = shift;
1853   my ($streamname, $filename);
1854   my $image = api_call("collections/get", uuid => $locator);
1855   if ($image) {
1856     foreach my $line (split(/\n/, $image->{manifest_text})) {
1857       my @tokens = split(/\s+/, $line);
1858       next if (!@tokens);
1859       $streamname = shift(@tokens);
1860       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1861         if (defined($filename)) {
1862           return (undef, undef);  # More than one file in the Collection.
1863         } else {
1864           $filename = (split(/:/, $filedata, 3))[2];
1865         }
1866       }
1867     }
1868   }
1869   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1870     return ($streamname, $1);
1871   } else {
1872     return (undef, undef);
1873   }
1874 }
1875
1876 sub retry_count {
1877   # Calculate the number of times an operation should be retried,
1878   # assuming exponential backoff, and that we're willing to retry as
1879   # long as tasks have been running.  Enforce a minimum of 3 retries.
1880   my ($starttime, $endtime, $timediff, $retries);
1881   if (@jobstep) {
1882     $starttime = $jobstep[0]->{starttime};
1883     $endtime = $jobstep[-1]->{finishtime};
1884   }
1885   if (!defined($starttime)) {
1886     $timediff = 0;
1887   } elsif (!defined($endtime)) {
1888     $timediff = time - $starttime;
1889   } else {
1890     $timediff = ($endtime - $starttime) - (time - $endtime);
1891   }
1892   if ($timediff > 0) {
1893     $retries = int(log($timediff) / log(2));
1894   } else {
1895     $retries = 1;  # Use the minimum.
1896   }
1897   return ($retries > 3) ? $retries : 3;
1898 }
1899
1900 sub retry_op {
1901   # Pass in two function references.
1902   # This method will be called with the remaining arguments.
1903   # If it dies, retry it with exponential backoff until it succeeds,
1904   # or until the current retry_count is exhausted.  After each failure
1905   # that can be retried, the second function will be called with
1906   # the current try count (0-based), next try time, and error message.
1907   my $operation = shift;
1908   my $retry_callback = shift;
1909   my $retries = retry_count();
1910   foreach my $try_count (0..$retries) {
1911     my $next_try = time + (2 ** $try_count);
1912     my $result = eval { $operation->(@_); };
1913     if (!$@) {
1914       return $result;
1915     } elsif ($try_count < $retries) {
1916       $retry_callback->($try_count, $next_try, $@);
1917       my $sleep_time = $next_try - time;
1918       sleep($sleep_time) if ($sleep_time > 0);
1919     }
1920   }
1921   # Ensure the error message ends in a newline, so Perl doesn't add
1922   # retry_op's line number to it.
1923   chomp($@);
1924   die($@ . "\n");
1925 }
1926
1927 sub api_call {
1928   # Pass in a /-separated API method name, and arguments for it.
1929   # This function will call that method, retrying as needed until
1930   # the current retry_count is exhausted, with a log on the first failure.
1931   my $method_name = shift;
1932   my $log_api_retry = sub {
1933     my ($try_count, $next_try_at, $errmsg) = @_;
1934     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1935     $errmsg =~ s/\s/ /g;
1936     $errmsg =~ s/\s+$//;
1937     my $retry_msg;
1938     if ($next_try_at < time) {
1939       $retry_msg = "Retrying.";
1940     } else {
1941       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1942       $retry_msg = "Retrying at $next_try_fmt.";
1943     }
1944     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1945   };
1946   my $method = $arv;
1947   foreach my $key (split(/\//, $method_name)) {
1948     $method = $method->{$key};
1949   }
1950   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1951 }
1952
1953 sub exit_status_s {
1954   # Given a $?, return a human-readable exit code string like "0" or
1955   # "1" or "0 with signal 1" or "1 with signal 11".
1956   my $exitcode = shift;
1957   my $s = $exitcode >> 8;
1958   if ($exitcode & 0x7f) {
1959     $s .= " with signal " . ($exitcode & 0x7f);
1960   }
1961   if ($exitcode & 0x80) {
1962     $s .= " with core dump";
1963   }
1964   return $s;
1965 }
1966
1967 sub handle_readall {
1968   # Pass in a glob reference to a file handle.
1969   # Read all its contents and return them as a string.
1970   my $fh_glob_ref = shift;
1971   local $/ = undef;
1972   return <$fh_glob_ref>;
1973 }
1974
1975 sub tar_filename_n {
1976   my $n = shift;
1977   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1978 }
1979
1980 sub add_git_archive {
1981   # Pass in a git archive command as a string or list, a la system().
1982   # This method will save its output to be included in the archive sent to the
1983   # build script.
1984   my $git_input;
1985   $git_tar_count++;
1986   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1987     croak("Failed to save git archive: $!");
1988   }
1989   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1990   close($git_input);
1991   waitpid($git_pid, 0);
1992   close(GIT_ARCHIVE);
1993   if ($?) {
1994     croak("Failed to save git archive: git exited " . exit_status_s($?));
1995   }
1996 }
1997
1998 sub combined_git_archive {
1999   # Combine all saved tar archives into a single archive, then return its
2000   # contents in a string.  Return undef if no archives have been saved.
2001   if ($git_tar_count < 1) {
2002     return undef;
2003   }
2004   my $base_tar_name = tar_filename_n(1);
2005   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2006     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2007     if ($tar_exit != 0) {
2008       croak("Error preparing build archive: tar -A exited " .
2009             exit_status_s($tar_exit));
2010     }
2011   }
2012   if (!open(GIT_TAR, "<", $base_tar_name)) {
2013     croak("Could not open build archive: $!");
2014   }
2015   my $tar_contents = handle_readall(\*GIT_TAR);
2016   close(GIT_TAR);
2017   return $tar_contents;
2018 }
2019
2020 sub set_nonblocking {
2021   my $fh = shift;
2022   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2023   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2024 }
2025
2026 __DATA__
2027 #!/usr/bin/perl
2028 #
2029 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2030 # server invokes this script on individual compute nodes, or localhost if we're
2031 # running a job locally.  It gets called in two modes:
2032 #
2033 # * No arguments: Installation mode.  Read a tar archive from the DATA
2034 #   file handle; it includes the Crunch script's source code, and
2035 #   maybe SDKs as well.  Those should be installed in the proper
2036 #   locations.  This runs outside of any Docker container, so don't try to
2037 #   introspect Crunch's runtime environment.
2038 #
2039 # * With arguments: Crunch script run mode.  This script should set up the
2040 #   environment, then run the command specified in the arguments.  This runs
2041 #   inside any Docker container.
2042
2043 use Fcntl ':flock';
2044 use File::Path qw( make_path remove_tree );
2045 use POSIX qw(getcwd);
2046
2047 use constant TASK_TEMPFAIL => 111;
2048
2049 # Map SDK subdirectories to the path environments they belong to.
2050 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2051
2052 my $destdir = $ENV{"CRUNCH_SRC"};
2053 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2054 my $repo = $ENV{"CRUNCH_SRC_URL"};
2055 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2056 my $job_work = $ENV{"JOB_WORK"};
2057 my $task_work = $ENV{"TASK_WORK"};
2058
2059 open(STDOUT_ORIG, ">&", STDOUT);
2060 open(STDERR_ORIG, ">&", STDERR);
2061
2062 for my $dir ($destdir, $job_work, $task_work) {
2063   if ($dir) {
2064     make_path $dir;
2065     -e $dir or die "Failed to create temporary directory ($dir): $!";
2066   }
2067 }
2068
2069 if ($task_work) {
2070   remove_tree($task_work, {keep_root => 1});
2071 }
2072
2073 ### Crunch script run mode
2074 if (@ARGV) {
2075   # We want to do routine logging during task 0 only.  This gives the user
2076   # the information they need, but avoids repeating the information for every
2077   # task.
2078   my $Log;
2079   if ($ENV{TASK_SEQUENCE} eq "0") {
2080     $Log = sub {
2081       my $msg = shift;
2082       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2083     };
2084   } else {
2085     $Log = sub { };
2086   }
2087
2088   my $python_src = "$install_dir/python";
2089   my $venv_dir = "$job_work/.arvados.venv";
2090   my $venv_built = -e "$venv_dir/bin/activate";
2091   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2092     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2093                  "--python=python2.7", $venv_dir);
2094     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2095     $venv_built = 1;
2096     $Log->("Built Python SDK virtualenv");
2097   }
2098
2099   my $pip_bin = "pip";
2100   if ($venv_built) {
2101     $Log->("Running in Python SDK virtualenv");
2102     $pip_bin = "$venv_dir/bin/pip";
2103     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2104     @ARGV = ("/bin/sh", "-ec",
2105              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2106   } elsif (-d $python_src) {
2107     $Log->("Warning: virtualenv not found inside Docker container default " .
2108            "\$PATH. Can't install Python SDK.");
2109   }
2110
2111   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2112   if ($pkgs) {
2113     $Log->("Using Arvados SDK:");
2114     foreach my $line (split /\n/, $pkgs) {
2115       $Log->($line);
2116     }
2117   } else {
2118     $Log->("Arvados SDK packages not found");
2119   }
2120
2121   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2122     my $sdk_path = "$install_dir/$sdk_dir";
2123     if (-d $sdk_path) {
2124       if ($ENV{$sdk_envkey}) {
2125         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2126       } else {
2127         $ENV{$sdk_envkey} = $sdk_path;
2128       }
2129       $Log->("Arvados SDK added to %s", $sdk_envkey);
2130     }
2131   }
2132
2133   exec(@ARGV);
2134   die "Cannot exec `@ARGV`: $!";
2135 }
2136
2137 ### Installation mode
2138 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2139 flock L, LOCK_EX;
2140 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2141   # This exact git archive (source + arvados sdk) is already installed
2142   # here, so there's no need to reinstall it.
2143
2144   # We must consume our DATA section, though: otherwise the process
2145   # feeding it to us will get SIGPIPE.
2146   my $buf;
2147   while (read(DATA, $buf, 65536)) { }
2148
2149   exit(0);
2150 }
2151
2152 unlink "$destdir.archive_hash";
2153 mkdir $destdir;
2154
2155 do {
2156   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2157   local $SIG{PIPE} = "IGNORE";
2158   warn "Extracting archive: $archive_hash\n";
2159   # --ignore-zeros is necessary sometimes: depending on how much NUL
2160   # padding tar -A put on our combined archive (which in turn depends
2161   # on the length of the component archives) tar without
2162   # --ignore-zeros will exit before consuming stdin and cause close()
2163   # to fail on the resulting SIGPIPE.
2164   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2165     die "Error launching 'tar -xC $destdir': $!";
2166   }
2167   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2168   # get SIGPIPE.  We must feed it data incrementally.
2169   my $tar_input;
2170   while (read(DATA, $tar_input, 65536)) {
2171     print TARX $tar_input;
2172   }
2173   if(!close(TARX)) {
2174     die "'tar -xC $destdir' exited $?: $!";
2175   }
2176 };
2177
2178 mkdir $install_dir;
2179
2180 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2181 if (-d $sdk_root) {
2182   foreach my $sdk_lang (("python",
2183                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2184     if (-d "$sdk_root/$sdk_lang") {
2185       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2186         die "Failed to install $sdk_lang SDK: $!";
2187       }
2188     }
2189   }
2190 }
2191
2192 my $python_dir = "$install_dir/python";
2193 if ((-d $python_dir) and can_run("python2.7")) {
2194   open(my $egg_info_pipe, "-|",
2195        "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
2196   my @egg_info_errors = <$egg_info_pipe>;
2197   close($egg_info_pipe);
2198   if ($?) {
2199     if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
2200       # egg_info apparently failed because it couldn't ask git for a build tag.
2201       # Specify no build tag.
2202       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2203       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2204       close($pysdk_cfg);
2205     } else {
2206       my $egg_info_exit = $? >> 8;
2207       foreach my $errline (@egg_info_errors) {
2208         print STDERR_ORIG $errline;
2209       }
2210       warn "python setup.py egg_info failed: exit $egg_info_exit";
2211       exit ($egg_info_exit || 1);
2212     }
2213   }
2214 }
2215
2216 # Hide messages from the install script (unless it fails: shell_or_die
2217 # will show $destdir.log in that case).
2218 open(STDOUT, ">>", "$destdir.log");
2219 open(STDERR, ">&", STDOUT);
2220
2221 if (-e "$destdir/crunch_scripts/install") {
2222     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2223 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2224     # Old version
2225     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2226 } elsif (-e "./install.sh") {
2227     shell_or_die (undef, "./install.sh", $install_dir);
2228 }
2229
2230 if ($archive_hash) {
2231     unlink "$destdir.archive_hash.new";
2232     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2233     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2234 }
2235
2236 close L;
2237
2238 sub can_run {
2239   my $command_name = shift;
2240   open(my $which, "-|", "which", $command_name);
2241   while (<$which>) { }
2242   close($which);
2243   return ($? == 0);
2244 }
2245
2246 sub shell_or_die
2247 {
2248   my $exitcode = shift;
2249
2250   if ($ENV{"DEBUG"}) {
2251     print STDERR "@_\n";
2252   }
2253   if (system (@_) != 0) {
2254     my $err = $!;
2255     my $code = $?;
2256     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2257     open STDERR, ">&STDERR_ORIG";
2258     system ("cat $destdir.log >&2");
2259     warn "@_ failed ($err): $exitstatus";
2260     if (defined($exitcode)) {
2261       exit $exitcode;
2262     }
2263     else {
2264       exit (($code >> 8) || 1);
2265     }
2266   }
2267 }
2268
2269 __DATA__