refs #6218
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "/usr/bin/docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $Job = JSON::decode_json($jobspec);
173   $local_job = 1;
174 }
175
176
177 # Make sure our workers (our slurm nodes, localhost, or whatever) are
178 # at least able to run basic commands: they aren't down or severely
179 # misconfigured.
180 my $cmd = ['true'];
181 if ($Job->{docker_image_locator}) {
182   $cmd = [$docker_bin, 'ps', '-q'];
183 }
184 Log(undef, "Sanity check is `@$cmd`");
185 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
186      $cmd,
187      {fork => 1});
188 if ($? != 0) {
189   Log(undef, "Sanity check failed: ".exit_status_s($?));
190   exit EX_TEMPFAIL;
191 }
192 Log(undef, "Sanity check OK");
193
194
195 my $User = api_call("users/current");
196
197 if (!$local_job) {
198   if (!$force_unlock) {
199     # Claim this job, and make sure nobody else does
200     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
201     if ($@) {
202       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
203       exit EX_TEMPFAIL;
204     };
205   }
206 }
207 else
208 {
209   if (!$resume_stash)
210   {
211     map { croak ("No $_ specified") unless $Job->{$_} }
212     qw(script script_version script_parameters);
213   }
214
215   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
216   $Job->{'started_at'} = gmtime;
217   $Job->{'state'} = 'Running';
218
219   $Job = api_call("jobs/create", job => $Job);
220 }
221 $job_id = $Job->{'uuid'};
222
223 my $keep_logfile = $job_id . '.log.txt';
224 log_writer_start($keep_logfile);
225
226 $Job->{'runtime_constraints'} ||= {};
227 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
228 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
229
230 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
231 if ($? == 0) {
232   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
233   chomp($gem_versions);
234   chop($gem_versions);  # Closing parentheses
235 } else {
236   $gem_versions = "";
237 }
238 Log(undef,
239     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
240
241 Log (undef, "check slurm allocation");
242 my @slot;
243 my @node;
244 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
245 my @sinfo;
246 if (!$have_slurm)
247 {
248   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
249   push @sinfo, "$localcpus localhost";
250 }
251 if (exists $ENV{SLURM_NODELIST})
252 {
253   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
254 }
255 foreach (@sinfo)
256 {
257   my ($ncpus, $slurm_nodelist) = split;
258   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
259
260   my @nodelist;
261   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
262   {
263     my $nodelist = $1;
264     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
265     {
266       my $ranges = $1;
267       foreach (split (",", $ranges))
268       {
269         my ($a, $b);
270         if (/(\d+)-(\d+)/)
271         {
272           $a = $1;
273           $b = $2;
274         }
275         else
276         {
277           $a = $_;
278           $b = $_;
279         }
280         push @nodelist, map {
281           my $n = $nodelist;
282           $n =~ s/\[[-,\d]+\]/$_/;
283           $n;
284         } ($a..$b);
285       }
286     }
287     else
288     {
289       push @nodelist, $nodelist;
290     }
291   }
292   foreach my $nodename (@nodelist)
293   {
294     Log (undef, "node $nodename - $ncpus slots");
295     my $node = { name => $nodename,
296                  ncpus => $ncpus,
297                  # The number of consecutive times a task has been dispatched
298                  # to this node and failed.
299                  losing_streak => 0,
300                  # The number of consecutive times that SLURM has reported
301                  # a node failure since the last successful task.
302                  fail_count => 0,
303                  # Don't dispatch work to this node until this time
304                  # (in seconds since the epoch) has passed.
305                  hold_until => 0 };
306     foreach my $cpu (1..$ncpus)
307     {
308       push @slot, { node => $node,
309                     cpu => $cpu };
310     }
311   }
312   push @node, @nodelist;
313 }
314
315
316
317 # Ensure that we get one jobstep running on each allocated node before
318 # we start overloading nodes with concurrent steps
319
320 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
321
322
323 $Job->update_attributes(
324   'tasks_summary' => { 'failed' => 0,
325                        'todo' => 1,
326                        'running' => 0,
327                        'done' => 0 });
328
329 Log (undef, "start");
330 $SIG{'INT'} = sub { $main::please_freeze = 1; };
331 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
332 $SIG{'TERM'} = \&croak;
333 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
334 $SIG{'ALRM'} = sub { $main::please_info = 1; };
335 $SIG{'CONT'} = sub { $main::please_continue = 1; };
336 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
337
338 $main::please_freeze = 0;
339 $main::please_info = 0;
340 $main::please_continue = 0;
341 $main::please_refresh = 0;
342 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
343
344 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
345 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
346 $ENV{"JOB_UUID"} = $job_id;
347
348
349 my @jobstep_todo = ();
350 my @jobstep_done = ();
351 my @jobstep_tomerge = ();
352 my $jobstep_tomerge_level = 0;
353 my $squeue_checked = 0;
354 my $latest_refresh = scalar time;
355
356
357
358 if (defined $Job->{thawedfromkey})
359 {
360   thaw ($Job->{thawedfromkey});
361 }
362 else
363 {
364   my $first_task = api_call("job_tasks/create", job_task => {
365     'job_uuid' => $Job->{'uuid'},
366     'sequence' => 0,
367     'qsequence' => 0,
368     'parameters' => {},
369   });
370   push @jobstep, { 'level' => 0,
371                    'failures' => 0,
372                    'arvados_task' => $first_task,
373                  };
374   push @jobstep_todo, 0;
375 }
376
377
378 if (!$have_slurm)
379 {
380   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
381 }
382
383 my $build_script = handle_readall(\*DATA);
384 my $nodelist = join(",", @node);
385 my $git_tar_count = 0;
386
387 if (!defined $no_clear_tmp) {
388   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
389   Log (undef, "Clean work dirs");
390
391   my $cleanpid = fork();
392   if ($cleanpid == 0)
393   {
394     # Find FUSE mounts that look like Keep mounts (the mount path has the
395     # word "keep") and unmount them.  Then clean up work directories.
396     # TODO: When #5036 is done and widely deployed, we can get rid of the
397     # regular expression and just unmount everything with type fuse.keep.
398     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
399           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
400     exit (1);
401   }
402   while (1)
403   {
404     last if $cleanpid == waitpid (-1, WNOHANG);
405     freeze_if_want_freeze ($cleanpid);
406     select (undef, undef, undef, 0.1);
407   }
408   Log (undef, "Cleanup command exited ".exit_status_s($?));
409 }
410
411 # If this job requires a Docker image, install that.
412 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
413 if ($docker_locator = $Job->{docker_image_locator}) {
414   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
415   if (!$docker_hash)
416   {
417     croak("No Docker image hash found from locator $docker_locator");
418   }
419   $docker_stream =~ s/^\.//;
420   my $docker_install_script = qq{
421 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
422     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
423 fi
424 };
425   my $docker_pid = fork();
426   if ($docker_pid == 0)
427   {
428     srun (["srun", "--nodelist=" . join(',', @node)],
429           ["/bin/sh", "-ec", $docker_install_script]);
430     exit ($?);
431   }
432   while (1)
433   {
434     last if $docker_pid == waitpid (-1, WNOHANG);
435     freeze_if_want_freeze ($docker_pid);
436     select (undef, undef, undef, 0.1);
437   }
438   if ($? != 0)
439   {
440     croak("Installing Docker image from $docker_locator exited "
441           .exit_status_s($?));
442   }
443
444   # Determine whether this version of Docker supports memory+swap limits.
445   srun(["srun", "--nodelist=" . $node[0]],
446        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
447       {fork => 1});
448   $docker_limitmem = ($? == 0);
449
450   if ($Job->{arvados_sdk_version}) {
451     # The job also specifies an Arvados SDK version.  Add the SDKs to the
452     # tar file for the build script to install.
453     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
454                        $Job->{arvados_sdk_version}));
455     add_git_archive("git", "--git-dir=$git_dir", "archive",
456                     "--prefix=.arvados.sdk/",
457                     $Job->{arvados_sdk_version}, "sdk");
458   }
459 }
460
461 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
462   # If script_version looks like an absolute path, *and* the --git-dir
463   # argument was not given -- which implies we were not invoked by
464   # crunch-dispatch -- we will use the given path as a working
465   # directory instead of resolving script_version to a git commit (or
466   # doing anything else with git).
467   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
468   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
469 }
470 else {
471   # Resolve the given script_version to a git commit sha1. Also, if
472   # the repository is remote, clone it into our local filesystem: this
473   # ensures "git archive" will work, and is necessary to reliably
474   # resolve a symbolic script_version like "master^".
475   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
476
477   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
478
479   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
480
481   # If we're running under crunch-dispatch, it will have already
482   # pulled the appropriate source tree into its own repository, and
483   # given us that repo's path as $git_dir.
484   #
485   # If we're running a "local" job, we might have to fetch content
486   # from a remote repository.
487   #
488   # (Currently crunch-dispatch gives a local path with --git-dir, but
489   # we might as well accept URLs there too in case it changes its
490   # mind.)
491   my $repo = $git_dir || $Job->{'repository'};
492
493   # Repository can be remote or local. If remote, we'll need to fetch it
494   # to a local dir before doing `git log` et al.
495   my $repo_location;
496
497   if ($repo =~ m{://|^[^/]*:}) {
498     # $repo is a git url we can clone, like git:// or https:// or
499     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
500     # not recognized here because distinguishing that from a local
501     # path is too fragile. If you really need something strange here,
502     # use the ssh:// form.
503     $repo_location = 'remote';
504   } elsif ($repo =~ m{^\.*/}) {
505     # $repo is a local path to a git index. We'll also resolve ../foo
506     # to ../foo/.git if the latter is a directory. To help
507     # disambiguate local paths from named hosted repositories, this
508     # form must be given as ./ or ../ if it's a relative path.
509     if (-d "$repo/.git") {
510       $repo = "$repo/.git";
511     }
512     $repo_location = 'local';
513   } else {
514     # $repo is none of the above. It must be the name of a hosted
515     # repository.
516     my $arv_repo_list = api_call("repositories/list",
517                                  'filters' => [['name','=',$repo]]);
518     my @repos_found = @{$arv_repo_list->{'items'}};
519     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
520     if ($n_found > 0) {
521       Log(undef, "Repository '$repo' -> "
522           . join(", ", map { $_->{'uuid'} } @repos_found));
523     }
524     if ($n_found != 1) {
525       croak("Error: Found $n_found repositories with name '$repo'.");
526     }
527     $repo = $repos_found[0]->{'fetch_url'};
528     $repo_location = 'remote';
529   }
530   Log(undef, "Using $repo_location repository '$repo'");
531   $ENV{"CRUNCH_SRC_URL"} = $repo;
532
533   # Resolve given script_version (we'll call that $treeish here) to a
534   # commit sha1 ($commit).
535   my $treeish = $Job->{'script_version'};
536   my $commit;
537   if ($repo_location eq 'remote') {
538     # We minimize excess object-fetching by re-using the same bare
539     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
540     # just keep adding remotes to it as needed.
541     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
542     my $gitcmd = "git --git-dir=\Q$local_repo\E";
543
544     # Set up our local repo for caching remote objects, making
545     # archives, etc.
546     if (!-d $local_repo) {
547       make_path($local_repo) or croak("Error: could not create $local_repo");
548     }
549     # This works (exits 0 and doesn't delete fetched objects) even
550     # if $local_repo is already initialized:
551     `$gitcmd init --bare`;
552     if ($?) {
553       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
554     }
555
556     # If $treeish looks like a hash (or abbrev hash) we look it up in
557     # our local cache first, since that's cheaper. (We don't want to
558     # do that with tags/branches though -- those change over time, so
559     # they should always be resolved by the remote repo.)
560     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
561       # Hide stderr because it's normal for this to fail:
562       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
563       if ($? == 0 &&
564           # Careful not to resolve a branch named abcdeff to commit 1234567:
565           $sha1 =~ /^$treeish/ &&
566           $sha1 =~ /^([0-9a-f]{40})$/s) {
567         $commit = $1;
568         Log(undef, "Commit $commit already present in $local_repo");
569       }
570     }
571
572     if (!defined $commit) {
573       # If $treeish isn't just a hash or abbrev hash, or isn't here
574       # yet, we need to fetch the remote to resolve it correctly.
575
576       # First, remove all local heads. This prevents a name that does
577       # not exist on the remote from resolving to (or colliding with)
578       # a previously fetched branch or tag (possibly from a different
579       # remote).
580       remove_tree("$local_repo/refs/heads", {keep_root => 1});
581
582       Log(undef, "Fetching objects from $repo to $local_repo");
583       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
584       if ($?) {
585         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
586       }
587     }
588
589     # Now that the data is all here, we will use our local repo for
590     # the rest of our git activities.
591     $repo = $local_repo;
592   }
593
594   my $gitcmd = "git --git-dir=\Q$repo\E";
595   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
596   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
597     croak("`$gitcmd rev-list` exited "
598           .exit_status_s($?)
599           .", '$treeish' not found. Giving up.");
600   }
601   $commit = $1;
602   Log(undef, "Version $treeish is commit $commit");
603
604   if ($commit ne $Job->{'script_version'}) {
605     # Record the real commit id in the database, frozentokey, logs,
606     # etc. -- instead of an abbreviation or a branch name which can
607     # become ambiguous or point to a different commit in the future.
608     if (!$Job->update_attributes('script_version' => $commit)) {
609       croak("Error: failed to update job's script_version attribute");
610     }
611   }
612
613   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
614   add_git_archive("$gitcmd archive ''\Q$commit\E");
615 }
616
617 my $git_archive = combined_git_archive();
618 if (!defined $git_archive) {
619   Log(undef, "Skip install phase (no git archive)");
620   if ($have_slurm) {
621     Log(undef, "Warning: This probably means workers have no source tree!");
622   }
623 }
624 else {
625   my $install_exited;
626   my $install_script_tries_left = 3;
627   for (my $attempts = 0; $attempts < 3; $attempts++) {
628     Log(undef, "Run install script on all workers");
629
630     my @srunargs = ("srun",
631                     "--nodelist=$nodelist",
632                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
633     my @execargs = ("sh", "-c",
634                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
635
636     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
637     my ($install_stderr_r, $install_stderr_w);
638     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
639     set_nonblocking($install_stderr_r);
640     my $installpid = fork();
641     if ($installpid == 0)
642     {
643       close($install_stderr_r);
644       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
645       open(STDOUT, ">&", $install_stderr_w);
646       open(STDERR, ">&", $install_stderr_w);
647       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
648       exit (1);
649     }
650     close($install_stderr_w);
651     # Tell freeze_if_want_freeze how to kill the child, otherwise the
652     # "waitpid(installpid)" loop won't get interrupted by a freeze:
653     $proc{$installpid} = {};
654     my $stderr_buf = '';
655     # Track whether anything appears on stderr other than slurm errors
656     # ("srun: ...") and the "starting: ..." message printed by the
657     # srun subroutine itself:
658     my $stderr_anything_from_script = 0;
659     my $match_our_own_errors = '^(srun: error: |starting: \[)';
660     while ($installpid != waitpid(-1, WNOHANG)) {
661       freeze_if_want_freeze ($installpid);
662       # Wait up to 0.1 seconds for something to appear on stderr, then
663       # do a non-blocking read.
664       my $bits = fhbits($install_stderr_r);
665       select ($bits, undef, $bits, 0.1);
666       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
667       {
668         while ($stderr_buf =~ /^(.*?)\n/) {
669           my $line = $1;
670           substr $stderr_buf, 0, 1+length($line), "";
671           Log(undef, "stderr $line");
672           if ($line !~ /$match_our_own_errors/) {
673             $stderr_anything_from_script = 1;
674           }
675         }
676       }
677     }
678     delete $proc{$installpid};
679     $install_exited = $?;
680     close($install_stderr_r);
681     if (length($stderr_buf) > 0) {
682       if ($stderr_buf !~ /$match_our_own_errors/) {
683         $stderr_anything_from_script = 1;
684       }
685       Log(undef, "stderr $stderr_buf")
686     }
687
688     Log (undef, "Install script exited ".exit_status_s($install_exited));
689     last if $install_exited == 0 || $main::please_freeze;
690     # If the install script fails but doesn't print an error message,
691     # the next thing anyone is likely to do is just run it again in
692     # case it was a transient problem like "slurm communication fails
693     # because the network isn't reliable enough". So we'll just do
694     # that ourselves (up to 3 attempts in total). OTOH, if there is an
695     # error message, the problem is more likely to have a real fix and
696     # we should fail the job so the fixing process can start, instead
697     # of doing 2 more attempts.
698     last if $stderr_anything_from_script;
699   }
700
701   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
702     unlink($tar_filename);
703   }
704
705   if ($install_exited != 0) {
706     croak("Giving up");
707   }
708 }
709
710 foreach (qw (script script_version script_parameters runtime_constraints))
711 {
712   Log (undef,
713        "$_ " .
714        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
715 }
716 foreach (split (/\n/, $Job->{knobs}))
717 {
718   Log (undef, "knob " . $_);
719 }
720
721
722
723 $main::success = undef;
724
725
726
727 ONELEVEL:
728
729 my $thisround_succeeded = 0;
730 my $thisround_failed = 0;
731 my $thisround_failed_multiple = 0;
732 my $working_slot_count = scalar(@slot);
733
734 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
735                        or $a <=> $b } @jobstep_todo;
736 my $level = $jobstep[$jobstep_todo[0]]->{level};
737
738 my $initial_tasks_this_level = 0;
739 foreach my $id (@jobstep_todo) {
740   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
741 }
742
743 # If the number of tasks scheduled at this level #T is smaller than the number
744 # of slots available #S, only use the first #T slots, or the first slot on
745 # each node, whichever number is greater.
746 #
747 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
748 # based on these numbers.  Using fewer slots makes more resources available
749 # to each individual task, which should normally be a better strategy when
750 # there are fewer of them running with less parallelism.
751 #
752 # Note that this calculation is not redone if the initial tasks at
753 # this level queue more tasks at the same level.  This may harm
754 # overall task throughput for that level.
755 my @freeslot;
756 if ($initial_tasks_this_level < @node) {
757   @freeslot = (0..$#node);
758 } elsif ($initial_tasks_this_level < @slot) {
759   @freeslot = (0..$initial_tasks_this_level - 1);
760 } else {
761   @freeslot = (0..$#slot);
762 }
763 my $round_num_freeslots = scalar(@freeslot);
764
765 my %round_max_slots = ();
766 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
767   my $this_slot = $slot[$freeslot[$ii]];
768   my $node_name = $this_slot->{node}->{name};
769   $round_max_slots{$node_name} ||= $this_slot->{cpu};
770   last if (scalar(keys(%round_max_slots)) >= @node);
771 }
772
773 Log(undef, "start level $level with $round_num_freeslots slots");
774 my @holdslot;
775 my %reader;
776 my $progress_is_dirty = 1;
777 my $progress_stats_updated = 0;
778
779 update_progress_stats();
780
781
782 THISROUND:
783 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
784 {
785   my $id = $jobstep_todo[$todo_ptr];
786   my $Jobstep = $jobstep[$id];
787   if ($Jobstep->{level} != $level)
788   {
789     next;
790   }
791
792   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
793   set_nonblocking($reader{$id});
794
795   my $childslot = $freeslot[0];
796   my $childnode = $slot[$childslot]->{node};
797   my $childslotname = join (".",
798                             $slot[$childslot]->{node}->{name},
799                             $slot[$childslot]->{cpu});
800
801   my $childpid = fork();
802   if ($childpid == 0)
803   {
804     $SIG{'INT'} = 'DEFAULT';
805     $SIG{'QUIT'} = 'DEFAULT';
806     $SIG{'TERM'} = 'DEFAULT';
807
808     foreach (values (%reader))
809     {
810       close($_);
811     }
812     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
813     open(STDOUT,">&writer");
814     open(STDERR,">&writer");
815
816     undef $dbh;
817     undef $sth;
818
819     delete $ENV{"GNUPGHOME"};
820     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
821     $ENV{"TASK_QSEQUENCE"} = $id;
822     $ENV{"TASK_SEQUENCE"} = $level;
823     $ENV{"JOB_SCRIPT"} = $Job->{script};
824     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
825       $param =~ tr/a-z/A-Z/;
826       $ENV{"JOB_PARAMETER_$param"} = $value;
827     }
828     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
829     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
830     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
831     $ENV{"HOME"} = $ENV{"TASK_WORK"};
832     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
833     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
834     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
835     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
836
837     $ENV{"GZIP"} = "-n";
838
839     my @srunargs = (
840       "srun",
841       "--nodelist=".$childnode->{name},
842       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
843       "--job-name=$job_id.$id.$$",
844         );
845     my $command =
846         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
847         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
848         ."&& cd $ENV{CRUNCH_TMP} "
849         # These environment variables get used explicitly later in
850         # $command.  No tool is expected to read these values directly.
851         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
852         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
853         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
854         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
855     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
856     if ($docker_hash)
857     {
858       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
859       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
860       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
861       # We only set memory limits if Docker lets us limit both memory and swap.
862       # Memory limits alone have been supported longer, but subprocesses tend
863       # to get SIGKILL if they exceed that without any swap limit set.
864       # See #5642 for additional background.
865       if ($docker_limitmem) {
866         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
867       }
868
869       # Dynamically configure the container to use the host system as its
870       # DNS server.  Get the host's global addresses from the ip command,
871       # and turn them into docker --dns options using gawk.
872       $command .=
873           q{$(ip -o address show scope global |
874               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
875
876       # The source tree and $destdir directory (which we have
877       # installed on the worker host) are available in the container,
878       # under the same path.
879       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
880       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
881
882       # Currently, we make arv-mount's mount point appear at /keep
883       # inside the container (instead of using the same path as the
884       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
885       # crunch scripts and utilities must not rely on this. They must
886       # use $TASK_KEEPMOUNT.
887       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
888       $ENV{TASK_KEEPMOUNT} = "/keep";
889
890       # TASK_WORK is almost exactly like a docker data volume: it
891       # starts out empty, is writable, and persists until no
892       # containers use it any more. We don't use --volumes-from to
893       # share it with other containers: it is only accessible to this
894       # task, and it goes away when this task stops.
895       #
896       # However, a docker data volume is writable only by root unless
897       # the mount point already happens to exist in the container with
898       # different permissions. Therefore, we [1] assume /tmp already
899       # exists in the image and is writable by the crunch user; [2]
900       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
901       # writable if they are created by docker while setting up the
902       # other --volumes); and [3] create $TASK_WORK inside the
903       # container using $build_script.
904       $command .= "--volume=/tmp ";
905       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
906       $ENV{"HOME"} = $ENV{"TASK_WORK"};
907       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
908
909       # TODO: Share a single JOB_WORK volume across all task
910       # containers on a given worker node, and delete it when the job
911       # ends (and, in case that doesn't work, when the next job
912       # starts).
913       #
914       # For now, use the same approach as TASK_WORK above.
915       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
916
917       while (my ($env_key, $env_val) = each %ENV)
918       {
919         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
920           $command .= "--env=\Q$env_key=$env_val\E ";
921         }
922       }
923       $command .= "--env=\QHOME=$ENV{HOME}\E ";
924       $command .= "\Q$docker_hash\E ";
925       $command .= "stdbuf --output=0 --error=0 ";
926       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
927     } else {
928       # Non-docker run
929       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
930       $command .= "stdbuf --output=0 --error=0 ";
931       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
932     }
933
934     my @execargs = ('bash', '-c', $command);
935     srun (\@srunargs, \@execargs, undef, $build_script);
936     # exec() failed, we assume nothing happened.
937     die "srun() failed on build script\n";
938   }
939   close("writer");
940   if (!defined $childpid)
941   {
942     close $reader{$id};
943     delete $reader{$id};
944     next;
945   }
946   shift @freeslot;
947   $proc{$childpid} = { jobstep => $id,
948                        time => time,
949                        slot => $childslot,
950                        jobstepname => "$job_id.$id.$childpid",
951                      };
952   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
953   $slot[$childslot]->{pid} = $childpid;
954
955   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
956   Log ($id, "child $childpid started on $childslotname");
957   $Jobstep->{starttime} = time;
958   $Jobstep->{node} = $childnode->{name};
959   $Jobstep->{slotindex} = $childslot;
960   delete $Jobstep->{stderr};
961   delete $Jobstep->{finishtime};
962   delete $Jobstep->{tempfail};
963
964   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
965   $Jobstep->{'arvados_task'}->save;
966
967   splice @jobstep_todo, $todo_ptr, 1;
968   --$todo_ptr;
969
970   $progress_is_dirty = 1;
971
972   while (!@freeslot
973          ||
974          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
975   {
976     last THISROUND if $main::please_freeze || defined($main::success);
977     if ($main::please_info)
978     {
979       $main::please_info = 0;
980       freeze();
981       create_output_collection();
982       save_meta(1);
983       update_progress_stats();
984     }
985     my $gotsome
986         = readfrompipes ()
987         + reapchildren ();
988     if (!$gotsome)
989     {
990       check_refresh_wanted();
991       check_squeue();
992       update_progress_stats();
993       select (undef, undef, undef, 0.1);
994     }
995     elsif (time - $progress_stats_updated >= 30)
996     {
997       update_progress_stats();
998     }
999     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1000                                         $_->{node}->{hold_count} < 4 } @slot);
1001     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1002         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1003     {
1004       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1005           .($thisround_failed+$thisround_succeeded)
1006           .") -- giving up on this round";
1007       Log (undef, $message);
1008       last THISROUND;
1009     }
1010
1011     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1012     for (my $i=$#freeslot; $i>=0; $i--) {
1013       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1014         push @holdslot, (splice @freeslot, $i, 1);
1015       }
1016     }
1017     for (my $i=$#holdslot; $i>=0; $i--) {
1018       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1019         push @freeslot, (splice @holdslot, $i, 1);
1020       }
1021     }
1022
1023     # give up if no nodes are succeeding
1024     if ($working_slot_count < 1) {
1025       Log(undef, "Every node has failed -- giving up");
1026       last THISROUND;
1027     }
1028   }
1029 }
1030
1031
1032 push @freeslot, splice @holdslot;
1033 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1034
1035
1036 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1037 while (%proc)
1038 {
1039   if ($main::please_continue) {
1040     $main::please_continue = 0;
1041     goto THISROUND;
1042   }
1043   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1044   readfrompipes ();
1045   if (!reapchildren())
1046   {
1047     check_refresh_wanted();
1048     check_squeue();
1049     update_progress_stats();
1050     select (undef, undef, undef, 0.1);
1051     killem (keys %proc) if $main::please_freeze;
1052   }
1053 }
1054
1055 update_progress_stats();
1056 freeze_if_want_freeze();
1057
1058
1059 if (!defined $main::success)
1060 {
1061   if (!@jobstep_todo) {
1062     $main::success = 1;
1063   } elsif ($working_slot_count < 1) {
1064     save_output_collection();
1065     save_meta();
1066     exit(EX_RETRY_UNLOCKED);
1067   } elsif ($thisround_succeeded == 0 &&
1068            ($thisround_failed == 0 || $thisround_failed > 4)) {
1069     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1070     Log (undef, $message);
1071     $main::success = 0;
1072   }
1073 }
1074
1075 goto ONELEVEL if !defined $main::success;
1076
1077
1078 release_allocation();
1079 freeze();
1080 my $collated_output = save_output_collection();
1081 Log (undef, "finish");
1082
1083 save_meta();
1084
1085 my $final_state;
1086 if ($collated_output && $main::success) {
1087   $final_state = 'Complete';
1088 } else {
1089   $final_state = 'Failed';
1090 }
1091 $Job->update_attributes('state' => $final_state);
1092
1093 exit (($final_state eq 'Complete') ? 0 : 1);
1094
1095
1096
1097 sub update_progress_stats
1098 {
1099   $progress_stats_updated = time;
1100   return if !$progress_is_dirty;
1101   my ($todo, $done, $running) = (scalar @jobstep_todo,
1102                                  scalar @jobstep_done,
1103                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1104   $Job->{'tasks_summary'} ||= {};
1105   $Job->{'tasks_summary'}->{'todo'} = $todo;
1106   $Job->{'tasks_summary'}->{'done'} = $done;
1107   $Job->{'tasks_summary'}->{'running'} = $running;
1108   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1109   Log (undef, "status: $done done, $running running, $todo todo");
1110   $progress_is_dirty = 0;
1111 }
1112
1113
1114
1115 sub reapchildren
1116 {
1117   my $pid = waitpid (-1, WNOHANG);
1118   return 0 if $pid <= 0;
1119
1120   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1121                   . "."
1122                   . $slot[$proc{$pid}->{slot}]->{cpu});
1123   my $jobstepid = $proc{$pid}->{jobstep};
1124   my $elapsed = time - $proc{$pid}->{time};
1125   my $Jobstep = $jobstep[$jobstepid];
1126
1127   my $childstatus = $?;
1128   my $exitvalue = $childstatus >> 8;
1129   my $exitinfo = "exit ".exit_status_s($childstatus);
1130   $Jobstep->{'arvados_task'}->reload;
1131   my $task_success = $Jobstep->{'arvados_task'}->{success};
1132
1133   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1134
1135   if (!defined $task_success) {
1136     # task did not indicate one way or the other --> fail
1137     $Jobstep->{'arvados_task'}->{success} = 0;
1138     $Jobstep->{'arvados_task'}->save;
1139     $task_success = 0;
1140   }
1141
1142   if (!$task_success)
1143   {
1144     my $temporary_fail;
1145     $temporary_fail ||= $Jobstep->{tempfail};
1146     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1147
1148     ++$thisround_failed;
1149     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1150
1151     # Check for signs of a failed or misconfigured node
1152     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1153         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1154       # Don't count this against jobstep failure thresholds if this
1155       # node is already suspected faulty and srun exited quickly
1156       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1157           $elapsed < 5) {
1158         Log ($jobstepid, "blaming failure on suspect node " .
1159              $slot[$proc{$pid}->{slot}]->{node}->{name});
1160         $temporary_fail ||= 1;
1161       }
1162       ban_node_by_slot($proc{$pid}->{slot});
1163     }
1164
1165     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1166                              ++$Jobstep->{'failures'},
1167                              $temporary_fail ? 'temporary' : 'permanent',
1168                              $elapsed));
1169
1170     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1171       # Give up on this task, and the whole job
1172       $main::success = 0;
1173     }
1174     # Put this task back on the todo queue
1175     push @jobstep_todo, $jobstepid;
1176     $Job->{'tasks_summary'}->{'failed'}++;
1177   }
1178   else
1179   {
1180     ++$thisround_succeeded;
1181     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1182     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1183     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1184     push @jobstep_done, $jobstepid;
1185     Log ($jobstepid, "success in $elapsed seconds");
1186   }
1187   $Jobstep->{exitcode} = $childstatus;
1188   $Jobstep->{finishtime} = time;
1189   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1190   $Jobstep->{'arvados_task'}->save;
1191   process_stderr ($jobstepid, $task_success);
1192   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1193                            length($Jobstep->{'arvados_task'}->{output}),
1194                            $Jobstep->{'arvados_task'}->{output}));
1195
1196   close $reader{$jobstepid};
1197   delete $reader{$jobstepid};
1198   delete $slot[$proc{$pid}->{slot}]->{pid};
1199   push @freeslot, $proc{$pid}->{slot};
1200   delete $proc{$pid};
1201
1202   if ($task_success) {
1203     # Load new tasks
1204     my $newtask_list = [];
1205     my $newtask_results;
1206     do {
1207       $newtask_results = api_call(
1208         "job_tasks/list",
1209         'where' => {
1210           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1211         },
1212         'order' => 'qsequence',
1213         'offset' => scalar(@$newtask_list),
1214       );
1215       push(@$newtask_list, @{$newtask_results->{items}});
1216     } while (@{$newtask_results->{items}});
1217     foreach my $arvados_task (@$newtask_list) {
1218       my $jobstep = {
1219         'level' => $arvados_task->{'sequence'},
1220         'failures' => 0,
1221         'arvados_task' => $arvados_task
1222       };
1223       push @jobstep, $jobstep;
1224       push @jobstep_todo, $#jobstep;
1225     }
1226   }
1227
1228   $progress_is_dirty = 1;
1229   1;
1230 }
1231
1232 sub check_refresh_wanted
1233 {
1234   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1235   if (@stat && $stat[9] > $latest_refresh) {
1236     $latest_refresh = scalar time;
1237     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1238     for my $attr ('cancelled_at',
1239                   'cancelled_by_user_uuid',
1240                   'cancelled_by_client_uuid',
1241                   'state') {
1242       $Job->{$attr} = $Job2->{$attr};
1243     }
1244     if ($Job->{'state'} ne "Running") {
1245       if ($Job->{'state'} eq "Cancelled") {
1246         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1247       } else {
1248         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1249       }
1250       $main::success = 0;
1251       $main::please_freeze = 1;
1252     }
1253   }
1254 }
1255
1256 sub check_squeue
1257 {
1258   my $last_squeue_check = $squeue_checked;
1259
1260   # Do not call `squeue` or check the kill list more than once every
1261   # 15 seconds.
1262   return if $last_squeue_check > time - 15;
1263   $squeue_checked = time;
1264
1265   # Look for children from which we haven't received stderr data since
1266   # the last squeue check. If no such children exist, all procs are
1267   # alive and there's no need to even look at squeue.
1268   #
1269   # As long as the crunchstat poll interval (10s) is shorter than the
1270   # squeue check interval (15s) this should make the squeue check an
1271   # infrequent event.
1272   my $silent_procs = 0;
1273   for my $jobstep (values %proc)
1274   {
1275     if ($jobstep->{stderr_at} < $last_squeue_check)
1276     {
1277       $silent_procs++;
1278     }
1279   }
1280   return if $silent_procs == 0;
1281
1282   # use killem() on procs whose killtime is reached
1283   while (my ($pid, $jobstep) = each %proc)
1284   {
1285     if (exists $jobstep->{killtime}
1286         && $jobstep->{killtime} <= time
1287         && $jobstep->{stderr_at} < $last_squeue_check)
1288     {
1289       my $sincewhen = "";
1290       if ($jobstep->{stderr_at}) {
1291         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1292       }
1293       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1294       killem ($pid);
1295     }
1296   }
1297
1298   if (!$have_slurm)
1299   {
1300     # here is an opportunity to check for mysterious problems with local procs
1301     return;
1302   }
1303
1304   # Get a list of steps still running.  Note: squeue(1) says --steps
1305   # selects a format (which we override anyway) and allows us to
1306   # specify which steps we're interested in (which we don't).
1307   # Importantly, it also changes the meaning of %j from "job name" to
1308   # "step name" and (although this isn't mentioned explicitly in the
1309   # docs) switches from "one line per job" mode to "one line per step"
1310   # mode. Without it, we'd just get a list of one job, instead of a
1311   # list of N steps.
1312   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1313   if ($? != 0)
1314   {
1315     Log(undef, "warning: squeue exit status $? ($!)");
1316     return;
1317   }
1318   chop @squeue;
1319
1320   # which of my jobsteps are running, according to squeue?
1321   my %ok;
1322   for my $jobstepname (@squeue)
1323   {
1324     $ok{$jobstepname} = 1;
1325   }
1326
1327   # Check for child procs >60s old and not mentioned by squeue.
1328   while (my ($pid, $jobstep) = each %proc)
1329   {
1330     if ($jobstep->{time} < time - 60
1331         && $jobstep->{jobstepname}
1332         && !exists $ok{$jobstep->{jobstepname}}
1333         && !exists $jobstep->{killtime})
1334     {
1335       # According to slurm, this task has ended (successfully or not)
1336       # -- but our srun child hasn't exited. First we must wait (30
1337       # seconds) in case this is just a race between communication
1338       # channels. Then, if our srun child process still hasn't
1339       # terminated, we'll conclude some slurm communication
1340       # error/delay has caused the task to die without notifying srun,
1341       # and we'll kill srun ourselves.
1342       $jobstep->{killtime} = time + 30;
1343       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1344     }
1345   }
1346 }
1347
1348
1349 sub release_allocation
1350 {
1351   if ($have_slurm)
1352   {
1353     Log (undef, "release job allocation");
1354     system "scancel $ENV{SLURM_JOB_ID}";
1355   }
1356 }
1357
1358
1359 sub readfrompipes
1360 {
1361   my $gotsome = 0;
1362   foreach my $job (keys %reader)
1363   {
1364     my $buf;
1365     while (0 < sysread ($reader{$job}, $buf, 8192))
1366     {
1367       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1368       $jobstep[$job]->{stderr_at} = time;
1369       $jobstep[$job]->{stderr} .= $buf;
1370       preprocess_stderr ($job);
1371       if (length ($jobstep[$job]->{stderr}) > 16384)
1372       {
1373         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1374       }
1375       $gotsome = 1;
1376     }
1377   }
1378   return $gotsome;
1379 }
1380
1381
1382 sub preprocess_stderr
1383 {
1384   my $job = shift;
1385
1386   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1387     my $line = $1;
1388     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1389     Log ($job, "stderr $line");
1390     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1391       # whoa.
1392       $main::please_freeze = 1;
1393     }
1394     elsif ($line =~ /srun: error: Node failure on/) {
1395       my $job_slot_index = $jobstep[$job]->{slotindex};
1396       $slot[$job_slot_index]->{node}->{fail_count}++;
1397       $jobstep[$job]->{tempfail} = 1;
1398       ban_node_by_slot($job_slot_index);
1399     }
1400     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1401       $jobstep[$job]->{tempfail} = 1;
1402       ban_node_by_slot($jobstep[$job]->{slotindex});
1403     }
1404     elsif ($line =~ /arvados\.errors\.Keep/) {
1405       $jobstep[$job]->{tempfail} = 1;
1406     }
1407   }
1408 }
1409
1410
1411 sub process_stderr
1412 {
1413   my $job = shift;
1414   my $task_success = shift;
1415   preprocess_stderr ($job);
1416
1417   map {
1418     Log ($job, "stderr $_");
1419   } split ("\n", $jobstep[$job]->{stderr});
1420 }
1421
1422 sub fetch_block
1423 {
1424   my $hash = shift;
1425   my $keep;
1426   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1427     Log(undef, "fetch_block run error from arv-get $hash: $!");
1428     return undef;
1429   }
1430   my $output_block = "";
1431   while (1) {
1432     my $buf;
1433     my $bytes = sysread($keep, $buf, 1024 * 1024);
1434     if (!defined $bytes) {
1435       Log(undef, "fetch_block read error from arv-get: $!");
1436       $output_block = undef;
1437       last;
1438     } elsif ($bytes == 0) {
1439       # sysread returns 0 at the end of the pipe.
1440       last;
1441     } else {
1442       # some bytes were read into buf.
1443       $output_block .= $buf;
1444     }
1445   }
1446   close $keep;
1447   if ($?) {
1448     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1449     $output_block = undef;
1450   }
1451   return $output_block;
1452 }
1453
1454 # Create a collection by concatenating the output of all tasks (each
1455 # task's output is either a manifest fragment, a locator for a
1456 # manifest fragment stored in Keep, or nothing at all). Return the
1457 # portable_data_hash of the new collection.
1458 sub create_output_collection
1459 {
1460   Log (undef, "collate");
1461
1462   my ($child_out, $child_in);
1463   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1464 import arvados
1465 import sys
1466 print (arvados.api("v1").collections().
1467        create(body={"manifest_text": sys.stdin.read()}).
1468        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1469 }, retry_count());
1470
1471   my $task_idx = -1;
1472   my $manifest_size = 0;
1473   for (@jobstep)
1474   {
1475     ++$task_idx;
1476     my $output = $_->{'arvados_task'}->{output};
1477     next if (!defined($output));
1478     my $next_write;
1479     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1480       $next_write = fetch_block($output);
1481     } else {
1482       $next_write = $output;
1483     }
1484     if (defined($next_write)) {
1485       if (!defined(syswrite($child_in, $next_write))) {
1486         # There's been an error writing.  Stop the loop.
1487         # We'll log details about the exit code later.
1488         last;
1489       } else {
1490         $manifest_size += length($next_write);
1491       }
1492     } else {
1493       my $uuid = $_->{'arvados_task'}->{'uuid'};
1494       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1495       $main::success = 0;
1496     }
1497   }
1498   close($child_in);
1499   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1500
1501   my $joboutput;
1502   my $s = IO::Select->new($child_out);
1503   if ($s->can_read(120)) {
1504     sysread($child_out, $joboutput, 1024 * 1024);
1505     waitpid($pid, 0);
1506     if ($?) {
1507       Log(undef, "output collection creation exited " . exit_status_s($?));
1508       $joboutput = undef;
1509     } else {
1510       chomp($joboutput);
1511     }
1512   } else {
1513     Log (undef, "timed out while creating output collection");
1514     foreach my $signal (2, 2, 2, 15, 15, 9) {
1515       kill($signal, $pid);
1516       last if waitpid($pid, WNOHANG) == -1;
1517       sleep(1);
1518     }
1519   }
1520   close($child_out);
1521
1522   return $joboutput;
1523 }
1524
1525 # Calls create_output_collection, logs the result, and returns it.
1526 # If that was successful, save that as the output in the job record.
1527 sub save_output_collection {
1528   my $collated_output = create_output_collection();
1529
1530   if (!$collated_output) {
1531     Log(undef, "Failed to write output collection");
1532   }
1533   else {
1534     Log(undef, "job output $collated_output");
1535     $Job->update_attributes('output' => $collated_output);
1536   }
1537   return $collated_output;
1538 }
1539
1540 sub killem
1541 {
1542   foreach (@_)
1543   {
1544     my $sig = 2;                # SIGINT first
1545     if (exists $proc{$_}->{"sent_$sig"} &&
1546         time - $proc{$_}->{"sent_$sig"} > 4)
1547     {
1548       $sig = 15;                # SIGTERM if SIGINT doesn't work
1549     }
1550     if (exists $proc{$_}->{"sent_$sig"} &&
1551         time - $proc{$_}->{"sent_$sig"} > 4)
1552     {
1553       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1554     }
1555     if (!exists $proc{$_}->{"sent_$sig"})
1556     {
1557       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1558       kill $sig, $_;
1559       select (undef, undef, undef, 0.1);
1560       if ($sig == 2)
1561       {
1562         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1563       }
1564       $proc{$_}->{"sent_$sig"} = time;
1565       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1566     }
1567   }
1568 }
1569
1570
1571 sub fhbits
1572 {
1573   my($bits);
1574   for (@_) {
1575     vec($bits,fileno($_),1) = 1;
1576   }
1577   $bits;
1578 }
1579
1580
1581 # Send log output to Keep via arv-put.
1582 #
1583 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1584 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1585 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1586 # $log_pipe_pid is the pid of the arv-put subprocess.
1587 #
1588 # The only functions that should access these variables directly are:
1589 #
1590 # log_writer_start($logfilename)
1591 #     Starts an arv-put pipe, reading data on stdin and writing it to
1592 #     a $logfilename file in an output collection.
1593 #
1594 # log_writer_read_output([$timeout])
1595 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1596 #     Passes $timeout to the select() call, with a default of 0.01.
1597 #     Returns the result of the last read() call on $log_pipe_out, or
1598 #     -1 if read() wasn't called because select() timed out.
1599 #     Only other log_writer_* functions should need to call this.
1600 #
1601 # log_writer_send($txt)
1602 #     Writes $txt to the output log collection.
1603 #
1604 # log_writer_finish()
1605 #     Closes the arv-put pipe and returns the output that it produces.
1606 #
1607 # log_writer_is_active()
1608 #     Returns a true value if there is currently a live arv-put
1609 #     process, false otherwise.
1610 #
1611 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1612     $log_pipe_pid);
1613
1614 sub log_writer_start($)
1615 {
1616   my $logfilename = shift;
1617   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1618                         'arv-put',
1619                         '--stream',
1620                         '--retries', '3',
1621                         '--filename', $logfilename,
1622                         '-');
1623   $log_pipe_out_buf = "";
1624   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1625 }
1626
1627 sub log_writer_read_output {
1628   my $timeout = shift || 0.01;
1629   my $read = -1;
1630   while ($read && $log_pipe_out_select->can_read($timeout)) {
1631     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1632                  length($log_pipe_out_buf));
1633   }
1634   if (!defined($read)) {
1635     Log(undef, "error reading log manifest from arv-put: $!");
1636   }
1637   return $read;
1638 }
1639
1640 sub log_writer_send($)
1641 {
1642   my $txt = shift;
1643   print $log_pipe_in $txt;
1644   log_writer_read_output();
1645 }
1646
1647 sub log_writer_finish()
1648 {
1649   return unless $log_pipe_pid;
1650
1651   close($log_pipe_in);
1652
1653   my $read_result = log_writer_read_output(120);
1654   if ($read_result == -1) {
1655     Log (undef, "timed out reading from 'arv-put'");
1656   } elsif ($read_result != 0) {
1657     Log(undef, "failed to read arv-put log manifest to EOF");
1658   }
1659
1660   waitpid($log_pipe_pid, 0);
1661   if ($?) {
1662     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1663   }
1664
1665   close($log_pipe_out);
1666   my $arv_put_output = $log_pipe_out_buf;
1667   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1668       $log_pipe_out_select = undef;
1669
1670   return $arv_put_output;
1671 }
1672
1673 sub log_writer_is_active() {
1674   return $log_pipe_pid;
1675 }
1676
1677 sub Log                         # ($jobstep_id, $logmessage)
1678 {
1679   if ($_[1] =~ /\n/) {
1680     for my $line (split (/\n/, $_[1])) {
1681       Log ($_[0], $line);
1682     }
1683     return;
1684   }
1685   my $fh = select STDERR; $|=1; select $fh;
1686   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1687   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1688   $message .= "\n";
1689   my $datetime;
1690   if (log_writer_is_active() || -t STDERR) {
1691     my @gmtime = gmtime;
1692     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1693                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1694   }
1695   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1696
1697   if (log_writer_is_active()) {
1698     log_writer_send($datetime . " " . $message);
1699   }
1700 }
1701
1702
1703 sub croak
1704 {
1705   my ($package, $file, $line) = caller;
1706   my $message = "@_ at $file line $line\n";
1707   Log (undef, $message);
1708   freeze() if @jobstep_todo;
1709   create_output_collection() if @jobstep_todo;
1710   cleanup();
1711   save_meta();
1712   die;
1713 }
1714
1715
1716 sub cleanup
1717 {
1718   return unless $Job;
1719   if ($Job->{'state'} eq 'Cancelled') {
1720     $Job->update_attributes('finished_at' => scalar gmtime);
1721   } else {
1722     $Job->update_attributes('state' => 'Failed');
1723   }
1724 }
1725
1726
1727 sub save_meta
1728 {
1729   my $justcheckpoint = shift; # false if this will be the last meta saved
1730   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1731   return unless log_writer_is_active();
1732
1733   my $log_manifest = "";
1734   if ($Job->{log}) {
1735     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1736     $log_manifest .= $prev_log_coll->{manifest_text};
1737   }
1738   $log_manifest .= log_writer_finish();
1739
1740   my $log_coll = api_call(
1741     "collections/create", ensure_unique_name => 1, collection => {
1742       manifest_text => $log_manifest,
1743       owner_uuid => $Job->{owner_uuid},
1744       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1745     });
1746   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1747   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1748 }
1749
1750
1751 sub freeze_if_want_freeze
1752 {
1753   if ($main::please_freeze)
1754   {
1755     release_allocation();
1756     if (@_)
1757     {
1758       # kill some srun procs before freeze+stop
1759       map { $proc{$_} = {} } @_;
1760       while (%proc)
1761       {
1762         killem (keys %proc);
1763         select (undef, undef, undef, 0.1);
1764         my $died;
1765         while (($died = waitpid (-1, WNOHANG)) > 0)
1766         {
1767           delete $proc{$died};
1768         }
1769       }
1770     }
1771     freeze();
1772     create_output_collection();
1773     cleanup();
1774     save_meta();
1775     exit 1;
1776   }
1777 }
1778
1779
1780 sub freeze
1781 {
1782   Log (undef, "Freeze not implemented");
1783   return;
1784 }
1785
1786
1787 sub thaw
1788 {
1789   croak ("Thaw not implemented");
1790 }
1791
1792
1793 sub freezequote
1794 {
1795   my $s = shift;
1796   $s =~ s/\\/\\\\/g;
1797   $s =~ s/\n/\\n/g;
1798   return $s;
1799 }
1800
1801
1802 sub freezeunquote
1803 {
1804   my $s = shift;
1805   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1806   return $s;
1807 }
1808
1809
1810 sub srun
1811 {
1812   my $srunargs = shift;
1813   my $execargs = shift;
1814   my $opts = shift || {};
1815   my $stdin = shift;
1816   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1817
1818   $Data::Dumper::Terse = 1;
1819   $Data::Dumper::Indent = 0;
1820   my $show_cmd = Dumper($args);
1821   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1822   $show_cmd =~ s/\n/ /g;
1823   if ($opts->{fork}) {
1824     Log(undef, "starting: $show_cmd");
1825   } else {
1826     # This is a child process: parent is in charge of reading our
1827     # stderr and copying it to Log() if needed.
1828     warn "starting: $show_cmd\n";
1829   }
1830
1831   if (defined $stdin) {
1832     my $child = open STDIN, "-|";
1833     defined $child or die "no fork: $!";
1834     if ($child == 0) {
1835       print $stdin or die $!;
1836       close STDOUT or die $!;
1837       exit 0;
1838     }
1839   }
1840
1841   return system (@$args) if $opts->{fork};
1842
1843   exec @$args;
1844   warn "ENV size is ".length(join(" ",%ENV));
1845   die "exec failed: $!: @$args";
1846 }
1847
1848
1849 sub ban_node_by_slot {
1850   # Don't start any new jobsteps on this node for 60 seconds
1851   my $slotid = shift;
1852   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1853   $slot[$slotid]->{node}->{hold_count}++;
1854   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1855 }
1856
1857 sub must_lock_now
1858 {
1859   my ($lockfile, $error_message) = @_;
1860   open L, ">", $lockfile or croak("$lockfile: $!");
1861   if (!flock L, LOCK_EX|LOCK_NB) {
1862     croak("Can't lock $lockfile: $error_message\n");
1863   }
1864 }
1865
1866 sub find_docker_image {
1867   # Given a Keep locator, check to see if it contains a Docker image.
1868   # If so, return its stream name and Docker hash.
1869   # If not, return undef for both values.
1870   my $locator = shift;
1871   my ($streamname, $filename);
1872   my $image = api_call("collections/get", uuid => $locator);
1873   if ($image) {
1874     foreach my $line (split(/\n/, $image->{manifest_text})) {
1875       my @tokens = split(/\s+/, $line);
1876       next if (!@tokens);
1877       $streamname = shift(@tokens);
1878       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1879         if (defined($filename)) {
1880           return (undef, undef);  # More than one file in the Collection.
1881         } else {
1882           $filename = (split(/:/, $filedata, 3))[2];
1883         }
1884       }
1885     }
1886   }
1887   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1888     return ($streamname, $1);
1889   } else {
1890     return (undef, undef);
1891   }
1892 }
1893
1894 sub retry_count {
1895   # Calculate the number of times an operation should be retried,
1896   # assuming exponential backoff, and that we're willing to retry as
1897   # long as tasks have been running.  Enforce a minimum of 3 retries.
1898   my ($starttime, $endtime, $timediff, $retries);
1899   if (@jobstep) {
1900     $starttime = $jobstep[0]->{starttime};
1901     $endtime = $jobstep[-1]->{finishtime};
1902   }
1903   if (!defined($starttime)) {
1904     $timediff = 0;
1905   } elsif (!defined($endtime)) {
1906     $timediff = time - $starttime;
1907   } else {
1908     $timediff = ($endtime - $starttime) - (time - $endtime);
1909   }
1910   if ($timediff > 0) {
1911     $retries = int(log($timediff) / log(2));
1912   } else {
1913     $retries = 1;  # Use the minimum.
1914   }
1915   return ($retries > 3) ? $retries : 3;
1916 }
1917
1918 sub retry_op {
1919   # Pass in two function references.
1920   # This method will be called with the remaining arguments.
1921   # If it dies, retry it with exponential backoff until it succeeds,
1922   # or until the current retry_count is exhausted.  After each failure
1923   # that can be retried, the second function will be called with
1924   # the current try count (0-based), next try time, and error message.
1925   my $operation = shift;
1926   my $retry_callback = shift;
1927   my $retries = retry_count();
1928   foreach my $try_count (0..$retries) {
1929     my $next_try = time + (2 ** $try_count);
1930     my $result = eval { $operation->(@_); };
1931     if (!$@) {
1932       return $result;
1933     } elsif ($try_count < $retries) {
1934       $retry_callback->($try_count, $next_try, $@);
1935       my $sleep_time = $next_try - time;
1936       sleep($sleep_time) if ($sleep_time > 0);
1937     }
1938   }
1939   # Ensure the error message ends in a newline, so Perl doesn't add
1940   # retry_op's line number to it.
1941   chomp($@);
1942   die($@ . "\n");
1943 }
1944
1945 sub api_call {
1946   # Pass in a /-separated API method name, and arguments for it.
1947   # This function will call that method, retrying as needed until
1948   # the current retry_count is exhausted, with a log on the first failure.
1949   my $method_name = shift;
1950   my $log_api_retry = sub {
1951     my ($try_count, $next_try_at, $errmsg) = @_;
1952     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1953     $errmsg =~ s/\s/ /g;
1954     $errmsg =~ s/\s+$//;
1955     my $retry_msg;
1956     if ($next_try_at < time) {
1957       $retry_msg = "Retrying.";
1958     } else {
1959       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1960       $retry_msg = "Retrying at $next_try_fmt.";
1961     }
1962     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1963   };
1964   my $method = $arv;
1965   foreach my $key (split(/\//, $method_name)) {
1966     $method = $method->{$key};
1967   }
1968   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1969 }
1970
1971 sub exit_status_s {
1972   # Given a $?, return a human-readable exit code string like "0" or
1973   # "1" or "0 with signal 1" or "1 with signal 11".
1974   my $exitcode = shift;
1975   my $s = $exitcode >> 8;
1976   if ($exitcode & 0x7f) {
1977     $s .= " with signal " . ($exitcode & 0x7f);
1978   }
1979   if ($exitcode & 0x80) {
1980     $s .= " with core dump";
1981   }
1982   return $s;
1983 }
1984
1985 sub handle_readall {
1986   # Pass in a glob reference to a file handle.
1987   # Read all its contents and return them as a string.
1988   my $fh_glob_ref = shift;
1989   local $/ = undef;
1990   return <$fh_glob_ref>;
1991 }
1992
1993 sub tar_filename_n {
1994   my $n = shift;
1995   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1996 }
1997
1998 sub add_git_archive {
1999   # Pass in a git archive command as a string or list, a la system().
2000   # This method will save its output to be included in the archive sent to the
2001   # build script.
2002   my $git_input;
2003   $git_tar_count++;
2004   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2005     croak("Failed to save git archive: $!");
2006   }
2007   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2008   close($git_input);
2009   waitpid($git_pid, 0);
2010   close(GIT_ARCHIVE);
2011   if ($?) {
2012     croak("Failed to save git archive: git exited " . exit_status_s($?));
2013   }
2014 }
2015
2016 sub combined_git_archive {
2017   # Combine all saved tar archives into a single archive, then return its
2018   # contents in a string.  Return undef if no archives have been saved.
2019   if ($git_tar_count < 1) {
2020     return undef;
2021   }
2022   my $base_tar_name = tar_filename_n(1);
2023   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2024     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2025     if ($tar_exit != 0) {
2026       croak("Error preparing build archive: tar -A exited " .
2027             exit_status_s($tar_exit));
2028     }
2029   }
2030   if (!open(GIT_TAR, "<", $base_tar_name)) {
2031     croak("Could not open build archive: $!");
2032   }
2033   my $tar_contents = handle_readall(\*GIT_TAR);
2034   close(GIT_TAR);
2035   return $tar_contents;
2036 }
2037
2038 sub set_nonblocking {
2039   my $fh = shift;
2040   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2041   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2042 }
2043
2044 __DATA__
2045 #!/usr/bin/perl
2046 #
2047 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2048 # server invokes this script on individual compute nodes, or localhost if we're
2049 # running a job locally.  It gets called in two modes:
2050 #
2051 # * No arguments: Installation mode.  Read a tar archive from the DATA
2052 #   file handle; it includes the Crunch script's source code, and
2053 #   maybe SDKs as well.  Those should be installed in the proper
2054 #   locations.  This runs outside of any Docker container, so don't try to
2055 #   introspect Crunch's runtime environment.
2056 #
2057 # * With arguments: Crunch script run mode.  This script should set up the
2058 #   environment, then run the command specified in the arguments.  This runs
2059 #   inside any Docker container.
2060
2061 use Fcntl ':flock';
2062 use File::Path qw( make_path remove_tree );
2063 use POSIX qw(getcwd);
2064
2065 use constant TASK_TEMPFAIL => 111;
2066
2067 # Map SDK subdirectories to the path environments they belong to.
2068 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2069
2070 my $destdir = $ENV{"CRUNCH_SRC"};
2071 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2072 my $repo = $ENV{"CRUNCH_SRC_URL"};
2073 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2074 my $job_work = $ENV{"JOB_WORK"};
2075 my $task_work = $ENV{"TASK_WORK"};
2076
2077 open(STDOUT_ORIG, ">&", STDOUT);
2078 open(STDERR_ORIG, ">&", STDERR);
2079
2080 for my $dir ($destdir, $job_work, $task_work) {
2081   if ($dir) {
2082     make_path $dir;
2083     -e $dir or die "Failed to create temporary directory ($dir): $!";
2084   }
2085 }
2086
2087 if ($task_work) {
2088   remove_tree($task_work, {keep_root => 1});
2089 }
2090
2091 ### Crunch script run mode
2092 if (@ARGV) {
2093   # We want to do routine logging during task 0 only.  This gives the user
2094   # the information they need, but avoids repeating the information for every
2095   # task.
2096   my $Log;
2097   if ($ENV{TASK_SEQUENCE} eq "0") {
2098     $Log = sub {
2099       my $msg = shift;
2100       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2101     };
2102   } else {
2103     $Log = sub { };
2104   }
2105
2106   my $python_src = "$install_dir/python";
2107   my $venv_dir = "$job_work/.arvados.venv";
2108   my $venv_built = -e "$venv_dir/bin/activate";
2109   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2110     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2111                  "--python=python2.7", $venv_dir);
2112     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2113     $venv_built = 1;
2114     $Log->("Built Python SDK virtualenv");
2115   }
2116
2117   my $pip_bin = "pip";
2118   if ($venv_built) {
2119     $Log->("Running in Python SDK virtualenv");
2120     $pip_bin = "$venv_dir/bin/pip";
2121     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2122     @ARGV = ("/bin/sh", "-ec",
2123              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2124   } elsif (-d $python_src) {
2125     $Log->("Warning: virtualenv not found inside Docker container default " .
2126            "\$PATH. Can't install Python SDK.");
2127   }
2128
2129   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2130   if ($pkgs) {
2131     $Log->("Using Arvados SDK:");
2132     foreach my $line (split /\n/, $pkgs) {
2133       $Log->($line);
2134     }
2135   } else {
2136     $Log->("Arvados SDK packages not found");
2137   }
2138
2139   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2140     my $sdk_path = "$install_dir/$sdk_dir";
2141     if (-d $sdk_path) {
2142       if ($ENV{$sdk_envkey}) {
2143         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2144       } else {
2145         $ENV{$sdk_envkey} = $sdk_path;
2146       }
2147       $Log->("Arvados SDK added to %s", $sdk_envkey);
2148     }
2149   }
2150
2151   exec(@ARGV);
2152   die "Cannot exec `@ARGV`: $!";
2153 }
2154
2155 ### Installation mode
2156 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2157 flock L, LOCK_EX;
2158 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2159   # This exact git archive (source + arvados sdk) is already installed
2160   # here, so there's no need to reinstall it.
2161
2162   # We must consume our DATA section, though: otherwise the process
2163   # feeding it to us will get SIGPIPE.
2164   my $buf;
2165   while (read(DATA, $buf, 65536)) { }
2166
2167   exit(0);
2168 }
2169
2170 unlink "$destdir.archive_hash";
2171 mkdir $destdir;
2172
2173 do {
2174   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2175   local $SIG{PIPE} = "IGNORE";
2176   warn "Extracting archive: $archive_hash\n";
2177   # --ignore-zeros is necessary sometimes: depending on how much NUL
2178   # padding tar -A put on our combined archive (which in turn depends
2179   # on the length of the component archives) tar without
2180   # --ignore-zeros will exit before consuming stdin and cause close()
2181   # to fail on the resulting SIGPIPE.
2182   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2183     die "Error launching 'tar -xC $destdir': $!";
2184   }
2185   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2186   # get SIGPIPE.  We must feed it data incrementally.
2187   my $tar_input;
2188   while (read(DATA, $tar_input, 65536)) {
2189     print TARX $tar_input;
2190   }
2191   if(!close(TARX)) {
2192     die "'tar -xC $destdir' exited $?: $!";
2193   }
2194 };
2195
2196 mkdir $install_dir;
2197
2198 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2199 if (-d $sdk_root) {
2200   foreach my $sdk_lang (("python",
2201                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2202     if (-d "$sdk_root/$sdk_lang") {
2203       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2204         die "Failed to install $sdk_lang SDK: $!";
2205       }
2206     }
2207   }
2208 }
2209
2210 my $python_dir = "$install_dir/python";
2211 if ((-d $python_dir) and can_run("python2.7")) {
2212   open(my $egg_info_pipe, "-|",
2213        "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
2214   my @egg_info_errors = <$egg_info_pipe>;
2215   close($egg_info_pipe);
2216   if ($?) {
2217     if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
2218       # egg_info apparently failed because it couldn't ask git for a build tag.
2219       # Specify no build tag.
2220       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2221       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2222       close($pysdk_cfg);
2223     } else {
2224       my $egg_info_exit = $? >> 8;
2225       foreach my $errline (@egg_info_errors) {
2226         print STDERR_ORIG $errline;
2227       }
2228       warn "python setup.py egg_info failed: exit $egg_info_exit";
2229       exit ($egg_info_exit || 1);
2230     }
2231   }
2232 }
2233
2234 # Hide messages from the install script (unless it fails: shell_or_die
2235 # will show $destdir.log in that case).
2236 open(STDOUT, ">>", "$destdir.log");
2237 open(STDERR, ">&", STDOUT);
2238
2239 if (-e "$destdir/crunch_scripts/install") {
2240     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2241 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2242     # Old version
2243     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2244 } elsif (-e "./install.sh") {
2245     shell_or_die (undef, "./install.sh", $install_dir);
2246 }
2247
2248 if ($archive_hash) {
2249     unlink "$destdir.archive_hash.new";
2250     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2251     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2252 }
2253
2254 close L;
2255
2256 sub can_run {
2257   my $command_name = shift;
2258   open(my $which, "-|", "which", $command_name);
2259   while (<$which>) { }
2260   close($which);
2261   return ($? == 0);
2262 }
2263
2264 sub shell_or_die
2265 {
2266   my $exitcode = shift;
2267
2268   if ($ENV{"DEBUG"}) {
2269     print STDERR "@_\n";
2270   }
2271   if (system (@_) != 0) {
2272     my $err = $!;
2273     my $code = $?;
2274     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2275     open STDERR, ">&STDERR_ORIG";
2276     system ("cat $destdir.log >&2");
2277     warn "@_ failed ($err): $exitstatus";
2278     if (defined($exitcode)) {
2279       exit $exitcode;
2280     }
2281     else {
2282       exit (($code >> 8) || 1);
2283     }
2284   }
2285 }
2286
2287 __DATA__