Merge branch '7263-better-busy-behavior' refs #7263
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
187      $cmd,
188      {fork => 1});
189 if ($? != 0) {
190   Log(undef, "Sanity check failed: ".exit_status_s($?));
191   exit EX_TEMPFAIL;
192 }
193 Log(undef, "Sanity check OK");
194
195
196 my $User = api_call("users/current");
197
198 if (!$local_job) {
199   if (!$force_unlock) {
200     # Claim this job, and make sure nobody else does
201     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
202     if ($@) {
203       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
204       exit EX_TEMPFAIL;
205     };
206   }
207 }
208 else
209 {
210   if (!$resume_stash)
211   {
212     map { croak ("No $_ specified") unless $local_job->{$_} }
213     qw(script script_version script_parameters);
214   }
215
216   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217   $local_job->{'started_at'} = gmtime;
218   $local_job->{'state'} = 'Running';
219
220   $Job = api_call("jobs/create", job => $local_job);
221 }
222 $job_id = $Job->{'uuid'};
223
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
226
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
230
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
232 if ($? == 0) {
233   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234   chomp($gem_versions);
235   chop($gem_versions);  # Closing parentheses
236 } else {
237   $gem_versions = "";
238 }
239 Log(undef,
240     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
241
242 Log (undef, "check slurm allocation");
243 my @slot;
244 my @node;
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
246 my @sinfo;
247 if (!$have_slurm)
248 {
249   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250   push @sinfo, "$localcpus localhost";
251 }
252 if (exists $ENV{SLURM_NODELIST})
253 {
254   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
255 }
256 foreach (@sinfo)
257 {
258   my ($ncpus, $slurm_nodelist) = split;
259   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260
261   my @nodelist;
262   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263   {
264     my $nodelist = $1;
265     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266     {
267       my $ranges = $1;
268       foreach (split (",", $ranges))
269       {
270         my ($a, $b);
271         if (/(\d+)-(\d+)/)
272         {
273           $a = $1;
274           $b = $2;
275         }
276         else
277         {
278           $a = $_;
279           $b = $_;
280         }
281         push @nodelist, map {
282           my $n = $nodelist;
283           $n =~ s/\[[-,\d]+\]/$_/;
284           $n;
285         } ($a..$b);
286       }
287     }
288     else
289     {
290       push @nodelist, $nodelist;
291     }
292   }
293   foreach my $nodename (@nodelist)
294   {
295     Log (undef, "node $nodename - $ncpus slots");
296     my $node = { name => $nodename,
297                  ncpus => $ncpus,
298                  # The number of consecutive times a task has been dispatched
299                  # to this node and failed.
300                  losing_streak => 0,
301                  # The number of consecutive times that SLURM has reported
302                  # a node failure since the last successful task.
303                  fail_count => 0,
304                  # Don't dispatch work to this node until this time
305                  # (in seconds since the epoch) has passed.
306                  hold_until => 0 };
307     foreach my $cpu (1..$ncpus)
308     {
309       push @slot, { node => $node,
310                     cpu => $cpu };
311     }
312   }
313   push @node, @nodelist;
314 }
315
316
317
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
320
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322
323
324 $Job->update_attributes(
325   'tasks_summary' => { 'failed' => 0,
326                        'todo' => 1,
327                        'running' => 0,
328                        'done' => 0 });
329
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
338
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
344
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
348
349
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
356
357
358
359 if (defined $Job->{thawedfromkey})
360 {
361   thaw ($Job->{thawedfromkey});
362 }
363 else
364 {
365   my $first_task = api_call("job_tasks/create", job_task => {
366     'job_uuid' => $Job->{'uuid'},
367     'sequence' => 0,
368     'qsequence' => 0,
369     'parameters' => {},
370   });
371   push @jobstep, { 'level' => 0,
372                    'failures' => 0,
373                    'arvados_task' => $first_task,
374                  };
375   push @jobstep_todo, 0;
376 }
377
378
379 if (!$have_slurm)
380 {
381   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 }
383
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
387
388 if (!defined $no_clear_tmp) {
389   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390   Log (undef, "Clean work dirs");
391
392   my $cleanpid = fork();
393   if ($cleanpid == 0)
394   {
395     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396     # Then clean up work directories.
397     # TODO: When #5036 is done and widely deployed, we can limit mount's
398     # -t option to simply fuse.keep.
399     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
401     exit (1);
402   }
403   while (1)
404   {
405     last if $cleanpid == waitpid (-1, WNOHANG);
406     freeze_if_want_freeze ($cleanpid);
407     select (undef, undef, undef, 0.1);
408   }
409   if ($?) {
410     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419   if (!$docker_hash)
420   {
421     croak("No Docker image hash found from locator $docker_locator");
422   }
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
426     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
427 fi
428 };
429   my $docker_pid = fork();
430   if ($docker_pid == 0)
431   {
432     srun (["srun", "--nodelist=" . join(',', @node)],
433           ["/bin/sh", "-ec", $docker_install_script]);
434     exit ($?);
435   }
436   while (1)
437   {
438     last if $docker_pid == waitpid (-1, WNOHANG);
439     freeze_if_want_freeze ($docker_pid);
440     select (undef, undef, undef, 0.1);
441   }
442   if ($? != 0)
443   {
444     croak("Installing Docker image from $docker_locator exited "
445           .exit_status_s($?));
446   }
447
448   # Determine whether this version of Docker supports memory+swap limits.
449   srun(["srun", "--nodelist=" . $node[0]],
450        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
451       {fork => 1});
452   $docker_limitmem = ($? == 0);
453
454   # Find a non-root Docker user to use.
455   # Tries the default user for the container, then 'crunch', then 'nobody',
456   # testing for whether the actual user id is non-zero.  This defends against
457   # mistakes but not malice, but we intend to harden the security in the future
458   # so we don't want anyone getting used to their jobs running as root in their
459   # Docker containers.
460   my @tryusers = ("", "crunch", "nobody");
461   foreach my $try_user (@tryusers) {
462     my $try_user_arg;
463     if ($try_user eq "") {
464       Log(undef, "Checking if container default user is not UID 0");
465       $try_user_arg = "";
466     } else {
467       Log(undef, "Checking if user '$try_user' is not UID 0");
468       $try_user_arg = "--user=$try_user";
469     }
470     srun(["srun", "--nodelist=" . $node[0]],
471          ["/bin/sh", "-ec",
472           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
473           " test \$a -ne 0"],
474          {fork => 1});
475     if ($? == 0) {
476       $dockeruserarg = $try_user_arg;
477       if ($try_user eq "") {
478         Log(undef, "Container will run with default user");
479       } else {
480         Log(undef, "Container will run with $dockeruserarg");
481       }
482       last;
483     }
484   }
485
486   if (!defined $dockeruserarg) {
487     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
488   }
489
490   if ($Job->{arvados_sdk_version}) {
491     # The job also specifies an Arvados SDK version.  Add the SDKs to the
492     # tar file for the build script to install.
493     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
494                        $Job->{arvados_sdk_version}));
495     add_git_archive("git", "--git-dir=$git_dir", "archive",
496                     "--prefix=.arvados.sdk/",
497                     $Job->{arvados_sdk_version}, "sdk");
498   }
499 }
500
501 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
502   # If script_version looks like an absolute path, *and* the --git-dir
503   # argument was not given -- which implies we were not invoked by
504   # crunch-dispatch -- we will use the given path as a working
505   # directory instead of resolving script_version to a git commit (or
506   # doing anything else with git).
507   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
508   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
509 }
510 else {
511   # Resolve the given script_version to a git commit sha1. Also, if
512   # the repository is remote, clone it into our local filesystem: this
513   # ensures "git archive" will work, and is necessary to reliably
514   # resolve a symbolic script_version like "master^".
515   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
516
517   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
520
521   # If we're running under crunch-dispatch, it will have already
522   # pulled the appropriate source tree into its own repository, and
523   # given us that repo's path as $git_dir.
524   #
525   # If we're running a "local" job, we might have to fetch content
526   # from a remote repository.
527   #
528   # (Currently crunch-dispatch gives a local path with --git-dir, but
529   # we might as well accept URLs there too in case it changes its
530   # mind.)
531   my $repo = $git_dir || $Job->{'repository'};
532
533   # Repository can be remote or local. If remote, we'll need to fetch it
534   # to a local dir before doing `git log` et al.
535   my $repo_location;
536
537   if ($repo =~ m{://|^[^/]*:}) {
538     # $repo is a git url we can clone, like git:// or https:// or
539     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
540     # not recognized here because distinguishing that from a local
541     # path is too fragile. If you really need something strange here,
542     # use the ssh:// form.
543     $repo_location = 'remote';
544   } elsif ($repo =~ m{^\.*/}) {
545     # $repo is a local path to a git index. We'll also resolve ../foo
546     # to ../foo/.git if the latter is a directory. To help
547     # disambiguate local paths from named hosted repositories, this
548     # form must be given as ./ or ../ if it's a relative path.
549     if (-d "$repo/.git") {
550       $repo = "$repo/.git";
551     }
552     $repo_location = 'local';
553   } else {
554     # $repo is none of the above. It must be the name of a hosted
555     # repository.
556     my $arv_repo_list = api_call("repositories/list",
557                                  'filters' => [['name','=',$repo]]);
558     my @repos_found = @{$arv_repo_list->{'items'}};
559     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
560     if ($n_found > 0) {
561       Log(undef, "Repository '$repo' -> "
562           . join(", ", map { $_->{'uuid'} } @repos_found));
563     }
564     if ($n_found != 1) {
565       croak("Error: Found $n_found repositories with name '$repo'.");
566     }
567     $repo = $repos_found[0]->{'fetch_url'};
568     $repo_location = 'remote';
569   }
570   Log(undef, "Using $repo_location repository '$repo'");
571   $ENV{"CRUNCH_SRC_URL"} = $repo;
572
573   # Resolve given script_version (we'll call that $treeish here) to a
574   # commit sha1 ($commit).
575   my $treeish = $Job->{'script_version'};
576   my $commit;
577   if ($repo_location eq 'remote') {
578     # We minimize excess object-fetching by re-using the same bare
579     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
580     # just keep adding remotes to it as needed.
581     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
582     my $gitcmd = "git --git-dir=\Q$local_repo\E";
583
584     # Set up our local repo for caching remote objects, making
585     # archives, etc.
586     if (!-d $local_repo) {
587       make_path($local_repo) or croak("Error: could not create $local_repo");
588     }
589     # This works (exits 0 and doesn't delete fetched objects) even
590     # if $local_repo is already initialized:
591     `$gitcmd init --bare`;
592     if ($?) {
593       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
594     }
595
596     # If $treeish looks like a hash (or abbrev hash) we look it up in
597     # our local cache first, since that's cheaper. (We don't want to
598     # do that with tags/branches though -- those change over time, so
599     # they should always be resolved by the remote repo.)
600     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
601       # Hide stderr because it's normal for this to fail:
602       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
603       if ($? == 0 &&
604           # Careful not to resolve a branch named abcdeff to commit 1234567:
605           $sha1 =~ /^$treeish/ &&
606           $sha1 =~ /^([0-9a-f]{40})$/s) {
607         $commit = $1;
608         Log(undef, "Commit $commit already present in $local_repo");
609       }
610     }
611
612     if (!defined $commit) {
613       # If $treeish isn't just a hash or abbrev hash, or isn't here
614       # yet, we need to fetch the remote to resolve it correctly.
615
616       # First, remove all local heads. This prevents a name that does
617       # not exist on the remote from resolving to (or colliding with)
618       # a previously fetched branch or tag (possibly from a different
619       # remote).
620       remove_tree("$local_repo/refs/heads", {keep_root => 1});
621
622       Log(undef, "Fetching objects from $repo to $local_repo");
623       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
624       if ($?) {
625         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626       }
627     }
628
629     # Now that the data is all here, we will use our local repo for
630     # the rest of our git activities.
631     $repo = $local_repo;
632   }
633
634   my $gitcmd = "git --git-dir=\Q$repo\E";
635   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
636   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
637     croak("`$gitcmd rev-list` exited "
638           .exit_status_s($?)
639           .", '$treeish' not found, giving up");
640   }
641   $commit = $1;
642   Log(undef, "Version $treeish is commit $commit");
643
644   if ($commit ne $Job->{'script_version'}) {
645     # Record the real commit id in the database, frozentokey, logs,
646     # etc. -- instead of an abbreviation or a branch name which can
647     # become ambiguous or point to a different commit in the future.
648     if (!$Job->update_attributes('script_version' => $commit)) {
649       croak("Error: failed to update job's script_version attribute");
650     }
651   }
652
653   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
654   add_git_archive("$gitcmd archive ''\Q$commit\E");
655 }
656
657 my $git_archive = combined_git_archive();
658 if (!defined $git_archive) {
659   Log(undef, "Skip install phase (no git archive)");
660   if ($have_slurm) {
661     Log(undef, "Warning: This probably means workers have no source tree!");
662   }
663 }
664 else {
665   my $install_exited;
666   my $install_script_tries_left = 3;
667   for (my $attempts = 0; $attempts < 3; $attempts++) {
668     Log(undef, "Run install script on all workers");
669
670     my @srunargs = ("srun",
671                     "--nodelist=$nodelist",
672                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
673     my @execargs = ("sh", "-c",
674                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
675
676     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
677     my ($install_stderr_r, $install_stderr_w);
678     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
679     set_nonblocking($install_stderr_r);
680     my $installpid = fork();
681     if ($installpid == 0)
682     {
683       close($install_stderr_r);
684       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
685       open(STDOUT, ">&", $install_stderr_w);
686       open(STDERR, ">&", $install_stderr_w);
687       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
688       exit (1);
689     }
690     close($install_stderr_w);
691     # Tell freeze_if_want_freeze how to kill the child, otherwise the
692     # "waitpid(installpid)" loop won't get interrupted by a freeze:
693     $proc{$installpid} = {};
694     my $stderr_buf = '';
695     # Track whether anything appears on stderr other than slurm errors
696     # ("srun: ...") and the "starting: ..." message printed by the
697     # srun subroutine itself:
698     my $stderr_anything_from_script = 0;
699     my $match_our_own_errors = '^(srun: error: |starting: \[)';
700     while ($installpid != waitpid(-1, WNOHANG)) {
701       freeze_if_want_freeze ($installpid);
702       # Wait up to 0.1 seconds for something to appear on stderr, then
703       # do a non-blocking read.
704       my $bits = fhbits($install_stderr_r);
705       select ($bits, undef, $bits, 0.1);
706       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
707       {
708         while ($stderr_buf =~ /^(.*?)\n/) {
709           my $line = $1;
710           substr $stderr_buf, 0, 1+length($line), "";
711           Log(undef, "stderr $line");
712           if ($line !~ /$match_our_own_errors/) {
713             $stderr_anything_from_script = 1;
714           }
715         }
716       }
717     }
718     delete $proc{$installpid};
719     $install_exited = $?;
720     close($install_stderr_r);
721     if (length($stderr_buf) > 0) {
722       if ($stderr_buf !~ /$match_our_own_errors/) {
723         $stderr_anything_from_script = 1;
724       }
725       Log(undef, "stderr $stderr_buf")
726     }
727
728     Log (undef, "Install script exited ".exit_status_s($install_exited));
729     last if $install_exited == 0 || $main::please_freeze;
730     # If the install script fails but doesn't print an error message,
731     # the next thing anyone is likely to do is just run it again in
732     # case it was a transient problem like "slurm communication fails
733     # because the network isn't reliable enough". So we'll just do
734     # that ourselves (up to 3 attempts in total). OTOH, if there is an
735     # error message, the problem is more likely to have a real fix and
736     # we should fail the job so the fixing process can start, instead
737     # of doing 2 more attempts.
738     last if $stderr_anything_from_script;
739   }
740
741   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
742     unlink($tar_filename);
743   }
744
745   if ($install_exited != 0) {
746     croak("Giving up");
747   }
748 }
749
750 foreach (qw (script script_version script_parameters runtime_constraints))
751 {
752   Log (undef,
753        "$_ " .
754        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
755 }
756 foreach (split (/\n/, $Job->{knobs}))
757 {
758   Log (undef, "knob " . $_);
759 }
760
761
762
763 $main::success = undef;
764
765
766
767 ONELEVEL:
768
769 my $thisround_succeeded = 0;
770 my $thisround_failed = 0;
771 my $thisround_failed_multiple = 0;
772 my $working_slot_count = scalar(@slot);
773
774 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
775                        or $a <=> $b } @jobstep_todo;
776 my $level = $jobstep[$jobstep_todo[0]]->{level};
777
778 my $initial_tasks_this_level = 0;
779 foreach my $id (@jobstep_todo) {
780   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
781 }
782
783 # If the number of tasks scheduled at this level #T is smaller than the number
784 # of slots available #S, only use the first #T slots, or the first slot on
785 # each node, whichever number is greater.
786 #
787 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
788 # based on these numbers.  Using fewer slots makes more resources available
789 # to each individual task, which should normally be a better strategy when
790 # there are fewer of them running with less parallelism.
791 #
792 # Note that this calculation is not redone if the initial tasks at
793 # this level queue more tasks at the same level.  This may harm
794 # overall task throughput for that level.
795 my @freeslot;
796 if ($initial_tasks_this_level < @node) {
797   @freeslot = (0..$#node);
798 } elsif ($initial_tasks_this_level < @slot) {
799   @freeslot = (0..$initial_tasks_this_level - 1);
800 } else {
801   @freeslot = (0..$#slot);
802 }
803 my $round_num_freeslots = scalar(@freeslot);
804
805 my %round_max_slots = ();
806 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
807   my $this_slot = $slot[$freeslot[$ii]];
808   my $node_name = $this_slot->{node}->{name};
809   $round_max_slots{$node_name} ||= $this_slot->{cpu};
810   last if (scalar(keys(%round_max_slots)) >= @node);
811 }
812
813 Log(undef, "start level $level with $round_num_freeslots slots");
814 my @holdslot;
815 my %reader;
816 my $progress_is_dirty = 1;
817 my $progress_stats_updated = 0;
818
819 update_progress_stats();
820
821
822 THISROUND:
823 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
824 {
825   # Don't create new tasks if we already know the job's final result.
826   last if defined($main::success);
827
828   my $id = $jobstep_todo[$todo_ptr];
829   my $Jobstep = $jobstep[$id];
830   if ($Jobstep->{level} != $level)
831   {
832     next;
833   }
834
835   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
836   set_nonblocking($reader{$id});
837
838   my $childslot = $freeslot[0];
839   my $childnode = $slot[$childslot]->{node};
840   my $childslotname = join (".",
841                             $slot[$childslot]->{node}->{name},
842                             $slot[$childslot]->{cpu});
843
844   my $childpid = fork();
845   if ($childpid == 0)
846   {
847     $SIG{'INT'} = 'DEFAULT';
848     $SIG{'QUIT'} = 'DEFAULT';
849     $SIG{'TERM'} = 'DEFAULT';
850
851     foreach (values (%reader))
852     {
853       close($_);
854     }
855     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
856     open(STDOUT,">&writer");
857     open(STDERR,">&writer");
858
859     undef $dbh;
860     undef $sth;
861
862     delete $ENV{"GNUPGHOME"};
863     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
864     $ENV{"TASK_QSEQUENCE"} = $id;
865     $ENV{"TASK_SEQUENCE"} = $level;
866     $ENV{"JOB_SCRIPT"} = $Job->{script};
867     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
868       $param =~ tr/a-z/A-Z/;
869       $ENV{"JOB_PARAMETER_$param"} = $value;
870     }
871     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
872     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
873     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
874     $ENV{"HOME"} = $ENV{"TASK_WORK"};
875     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
876     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
877     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
878
879     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
880
881     $ENV{"GZIP"} = "-n";
882
883     my @srunargs = (
884       "srun",
885       "--nodelist=".$childnode->{name},
886       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
887       "--job-name=$job_id.$id.$$",
888         );
889
890     my $stdbuf = " stdbuf --output=0 --error=0 ";
891
892     my $arv_file_cache = "";
893     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
894       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
895     }
896
897     my $command =
898         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
899         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
900         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
901         # These environment variables get used explicitly later in
902         # $command.  No tool is expected to read these values directly.
903         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
904         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
905         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
906         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
907
908     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
909     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
910     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
911
912     if ($docker_hash)
913     {
914       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
915       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
916       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
917       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
918       # We only set memory limits if Docker lets us limit both memory and swap.
919       # Memory limits alone have been supported longer, but subprocesses tend
920       # to get SIGKILL if they exceed that without any swap limit set.
921       # See #5642 for additional background.
922       if ($docker_limitmem) {
923         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
924       }
925
926       # The source tree and $destdir directory (which we have
927       # installed on the worker host) are available in the container,
928       # under the same path.
929       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
930       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
931
932       # Currently, we make the "by_pdh" directory in arv-mount's mount
933       # point appear at /keep inside the container (instead of using
934       # the same path as the host like we do with CRUNCH_SRC and
935       # CRUNCH_INSTALL). However, crunch scripts and utilities must
936       # not rely on this. They must use $TASK_KEEPMOUNT.
937       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
938       $ENV{TASK_KEEPMOUNT} = "/keep";
939
940       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
941       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
942       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
943
944       # TASK_WORK is almost exactly like a docker data volume: it
945       # starts out empty, is writable, and persists until no
946       # containers use it any more. We don't use --volumes-from to
947       # share it with other containers: it is only accessible to this
948       # task, and it goes away when this task stops.
949       #
950       # However, a docker data volume is writable only by root unless
951       # the mount point already happens to exist in the container with
952       # different permissions. Therefore, we [1] assume /tmp already
953       # exists in the image and is writable by the crunch user; [2]
954       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
955       # writable if they are created by docker while setting up the
956       # other --volumes); and [3] create $TASK_WORK inside the
957       # container using $build_script.
958       $command .= "--volume=/tmp ";
959       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
960       $ENV{"HOME"} = $ENV{"TASK_WORK"};
961       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
962
963       # TODO: Share a single JOB_WORK volume across all task
964       # containers on a given worker node, and delete it when the job
965       # ends (and, in case that doesn't work, when the next job
966       # starts).
967       #
968       # For now, use the same approach as TASK_WORK above.
969       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
970
971       while (my ($env_key, $env_val) = each %ENV)
972       {
973         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
974           $command .= "--env=\Q$env_key=$env_val\E ";
975         }
976       }
977       $command .= "--env=\QHOME=$ENV{HOME}\E ";
978       $command .= "\Q$docker_hash\E ";
979
980       if ($Job->{arvados_sdk_version}) {
981         $command .= $stdbuf;
982         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
983       } else {
984         $command .= "/bin/sh -c \'python -c " .
985             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
986             ">&2 2>/dev/null; " .
987             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
988             "if which stdbuf >/dev/null ; then " .
989             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
990             " else " .
991             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
992             " fi\'";
993       }
994     } else {
995       # Non-docker run
996       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
997       $command .= $stdbuf;
998       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
999     }
1000
1001     my @execargs = ('bash', '-c', $command);
1002     srun (\@srunargs, \@execargs, undef, $build_script);
1003     # exec() failed, we assume nothing happened.
1004     die "srun() failed on build script\n";
1005   }
1006   close("writer");
1007   if (!defined $childpid)
1008   {
1009     close $reader{$id};
1010     delete $reader{$id};
1011     next;
1012   }
1013   shift @freeslot;
1014   $proc{$childpid} = { jobstep => $id,
1015                        time => time,
1016                        slot => $childslot,
1017                        jobstepname => "$job_id.$id.$childpid",
1018                      };
1019   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1020   $slot[$childslot]->{pid} = $childpid;
1021
1022   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1023   Log ($id, "child $childpid started on $childslotname");
1024   $Jobstep->{starttime} = time;
1025   $Jobstep->{node} = $childnode->{name};
1026   $Jobstep->{slotindex} = $childslot;
1027   delete $Jobstep->{stderr};
1028   delete $Jobstep->{finishtime};
1029   delete $Jobstep->{tempfail};
1030
1031   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1032   $Jobstep->{'arvados_task'}->save;
1033
1034   splice @jobstep_todo, $todo_ptr, 1;
1035   --$todo_ptr;
1036
1037   $progress_is_dirty = 1;
1038
1039   while (!@freeslot
1040          ||
1041          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1042   {
1043     last THISROUND if $main::please_freeze;
1044     if ($main::please_info)
1045     {
1046       $main::please_info = 0;
1047       freeze();
1048       create_output_collection();
1049       save_meta(1);
1050       update_progress_stats();
1051     }
1052     my $gotsome
1053         = readfrompipes ()
1054         + reapchildren ();
1055     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1056     {
1057       check_refresh_wanted();
1058       check_squeue();
1059       update_progress_stats();
1060     }
1061     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1062     {
1063       update_progress_stats();
1064     }
1065     if (!$gotsome) {
1066       select (undef, undef, undef, 0.1);
1067     }
1068     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1069                                         $_->{node}->{hold_count} < 4 } @slot);
1070     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1071         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1072     {
1073       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1074           .($thisround_failed+$thisround_succeeded)
1075           .") -- giving up on this round";
1076       Log (undef, $message);
1077       last THISROUND;
1078     }
1079
1080     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1081     for (my $i=$#freeslot; $i>=0; $i--) {
1082       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1083         push @holdslot, (splice @freeslot, $i, 1);
1084       }
1085     }
1086     for (my $i=$#holdslot; $i>=0; $i--) {
1087       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1088         push @freeslot, (splice @holdslot, $i, 1);
1089       }
1090     }
1091
1092     # give up if no nodes are succeeding
1093     if ($working_slot_count < 1) {
1094       Log(undef, "Every node has failed -- giving up");
1095       last THISROUND;
1096     }
1097   }
1098 }
1099
1100
1101 push @freeslot, splice @holdslot;
1102 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1103
1104
1105 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1106 while (%proc)
1107 {
1108   if ($main::please_continue) {
1109     $main::please_continue = 0;
1110     goto THISROUND;
1111   }
1112   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1113   readfrompipes ();
1114   if (!reapchildren())
1115   {
1116     check_refresh_wanted();
1117     check_squeue();
1118     update_progress_stats();
1119     select (undef, undef, undef, 0.1);
1120     killem (keys %proc) if $main::please_freeze;
1121   }
1122 }
1123
1124 update_progress_stats();
1125 freeze_if_want_freeze();
1126
1127
1128 if (!defined $main::success)
1129 {
1130   if (!@jobstep_todo) {
1131     $main::success = 1;
1132   } elsif ($working_slot_count < 1) {
1133     save_output_collection();
1134     save_meta();
1135     exit(EX_RETRY_UNLOCKED);
1136   } elsif ($thisround_succeeded == 0 &&
1137            ($thisround_failed == 0 || $thisround_failed > 4)) {
1138     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1139     Log (undef, $message);
1140     $main::success = 0;
1141   }
1142 }
1143
1144 goto ONELEVEL if !defined $main::success;
1145
1146
1147 release_allocation();
1148 freeze();
1149 my $collated_output = save_output_collection();
1150 Log (undef, "finish");
1151
1152 save_meta();
1153
1154 my $final_state;
1155 if ($collated_output && $main::success) {
1156   $final_state = 'Complete';
1157 } else {
1158   $final_state = 'Failed';
1159 }
1160 $Job->update_attributes('state' => $final_state);
1161
1162 exit (($final_state eq 'Complete') ? 0 : 1);
1163
1164
1165
1166 sub update_progress_stats
1167 {
1168   $progress_stats_updated = time;
1169   return if !$progress_is_dirty;
1170   my ($todo, $done, $running) = (scalar @jobstep_todo,
1171                                  scalar @jobstep_done,
1172                                  scalar keys(%proc));
1173   $Job->{'tasks_summary'} ||= {};
1174   $Job->{'tasks_summary'}->{'todo'} = $todo;
1175   $Job->{'tasks_summary'}->{'done'} = $done;
1176   $Job->{'tasks_summary'}->{'running'} = $running;
1177   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1178   Log (undef, "status: $done done, $running running, $todo todo");
1179   $progress_is_dirty = 0;
1180 }
1181
1182
1183
1184 sub reapchildren
1185 {
1186   my $pid = waitpid (-1, WNOHANG);
1187   return 0 if $pid <= 0;
1188
1189   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1190                   . "."
1191                   . $slot[$proc{$pid}->{slot}]->{cpu});
1192   my $jobstepid = $proc{$pid}->{jobstep};
1193   my $elapsed = time - $proc{$pid}->{time};
1194   my $Jobstep = $jobstep[$jobstepid];
1195
1196   my $childstatus = $?;
1197   my $exitvalue = $childstatus >> 8;
1198   my $exitinfo = "exit ".exit_status_s($childstatus);
1199   $Jobstep->{'arvados_task'}->reload;
1200   my $task_success = $Jobstep->{'arvados_task'}->{success};
1201
1202   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1203
1204   if (!defined $task_success) {
1205     # task did not indicate one way or the other --> fail
1206     Log($jobstepid, sprintf(
1207           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1208           exit_status_s($childstatus)));
1209     $Jobstep->{'arvados_task'}->{success} = 0;
1210     $Jobstep->{'arvados_task'}->save;
1211     $task_success = 0;
1212   }
1213
1214   if (!$task_success)
1215   {
1216     my $temporary_fail;
1217     $temporary_fail ||= $Jobstep->{tempfail};
1218     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1219
1220     ++$thisround_failed;
1221     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1222
1223     # Check for signs of a failed or misconfigured node
1224     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1225         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1226       # Don't count this against jobstep failure thresholds if this
1227       # node is already suspected faulty and srun exited quickly
1228       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1229           $elapsed < 5) {
1230         Log ($jobstepid, "blaming failure on suspect node " .
1231              $slot[$proc{$pid}->{slot}]->{node}->{name});
1232         $temporary_fail ||= 1;
1233       }
1234       ban_node_by_slot($proc{$pid}->{slot});
1235     }
1236
1237     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1238                              ++$Jobstep->{'failures'},
1239                              $temporary_fail ? 'temporary' : 'permanent',
1240                              $elapsed));
1241
1242     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1243       # Give up on this task, and the whole job
1244       $main::success = 0;
1245     }
1246     # Put this task back on the todo queue
1247     push @jobstep_todo, $jobstepid;
1248     $Job->{'tasks_summary'}->{'failed'}++;
1249   }
1250   else
1251   {
1252     ++$thisround_succeeded;
1253     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1254     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1255     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1256     push @jobstep_done, $jobstepid;
1257     Log ($jobstepid, "success in $elapsed seconds");
1258   }
1259   $Jobstep->{exitcode} = $childstatus;
1260   $Jobstep->{finishtime} = time;
1261   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1262   $Jobstep->{'arvados_task'}->save;
1263   process_stderr ($jobstepid, $task_success);
1264   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1265                            length($Jobstep->{'arvados_task'}->{output}),
1266                            $Jobstep->{'arvados_task'}->{output}));
1267
1268   close $reader{$jobstepid};
1269   delete $reader{$jobstepid};
1270   delete $slot[$proc{$pid}->{slot}]->{pid};
1271   push @freeslot, $proc{$pid}->{slot};
1272   delete $proc{$pid};
1273
1274   if ($task_success) {
1275     # Load new tasks
1276     my $newtask_list = [];
1277     my $newtask_results;
1278     do {
1279       $newtask_results = api_call(
1280         "job_tasks/list",
1281         'where' => {
1282           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1283         },
1284         'order' => 'qsequence',
1285         'offset' => scalar(@$newtask_list),
1286       );
1287       push(@$newtask_list, @{$newtask_results->{items}});
1288     } while (@{$newtask_results->{items}});
1289     foreach my $arvados_task (@$newtask_list) {
1290       my $jobstep = {
1291         'level' => $arvados_task->{'sequence'},
1292         'failures' => 0,
1293         'arvados_task' => $arvados_task
1294       };
1295       push @jobstep, $jobstep;
1296       push @jobstep_todo, $#jobstep;
1297     }
1298   }
1299
1300   $progress_is_dirty = 1;
1301   1;
1302 }
1303
1304 sub check_refresh_wanted
1305 {
1306   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1307   if (@stat && $stat[9] > $latest_refresh) {
1308     $latest_refresh = scalar time;
1309     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1310     for my $attr ('cancelled_at',
1311                   'cancelled_by_user_uuid',
1312                   'cancelled_by_client_uuid',
1313                   'state') {
1314       $Job->{$attr} = $Job2->{$attr};
1315     }
1316     if ($Job->{'state'} ne "Running") {
1317       if ($Job->{'state'} eq "Cancelled") {
1318         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1319       } else {
1320         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1321       }
1322       $main::success = 0;
1323       $main::please_freeze = 1;
1324     }
1325   }
1326 }
1327
1328 sub check_squeue
1329 {
1330   my $last_squeue_check = $squeue_checked;
1331
1332   # Do not call `squeue` or check the kill list more than once every
1333   # 15 seconds.
1334   return if $last_squeue_check > time - 15;
1335   $squeue_checked = time;
1336
1337   # Look for children from which we haven't received stderr data since
1338   # the last squeue check. If no such children exist, all procs are
1339   # alive and there's no need to even look at squeue.
1340   #
1341   # As long as the crunchstat poll interval (10s) is shorter than the
1342   # squeue check interval (15s) this should make the squeue check an
1343   # infrequent event.
1344   my $silent_procs = 0;
1345   for my $jobstep (values %proc)
1346   {
1347     if ($jobstep->{stderr_at} < $last_squeue_check)
1348     {
1349       $silent_procs++;
1350     }
1351   }
1352   return if $silent_procs == 0;
1353
1354   # use killem() on procs whose killtime is reached
1355   while (my ($pid, $jobstep) = each %proc)
1356   {
1357     if (exists $jobstep->{killtime}
1358         && $jobstep->{killtime} <= time
1359         && $jobstep->{stderr_at} < $last_squeue_check)
1360     {
1361       my $sincewhen = "";
1362       if ($jobstep->{stderr_at}) {
1363         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1364       }
1365       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1366       killem ($pid);
1367     }
1368   }
1369
1370   if (!$have_slurm)
1371   {
1372     # here is an opportunity to check for mysterious problems with local procs
1373     return;
1374   }
1375
1376   # Get a list of steps still running.  Note: squeue(1) says --steps
1377   # selects a format (which we override anyway) and allows us to
1378   # specify which steps we're interested in (which we don't).
1379   # Importantly, it also changes the meaning of %j from "job name" to
1380   # "step name" and (although this isn't mentioned explicitly in the
1381   # docs) switches from "one line per job" mode to "one line per step"
1382   # mode. Without it, we'd just get a list of one job, instead of a
1383   # list of N steps.
1384   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1385   if ($? != 0)
1386   {
1387     Log(undef, "warning: squeue exit status $? ($!)");
1388     return;
1389   }
1390   chop @squeue;
1391
1392   # which of my jobsteps are running, according to squeue?
1393   my %ok;
1394   for my $jobstepname (@squeue)
1395   {
1396     $ok{$jobstepname} = 1;
1397   }
1398
1399   # Check for child procs >60s old and not mentioned by squeue.
1400   while (my ($pid, $jobstep) = each %proc)
1401   {
1402     if ($jobstep->{time} < time - 60
1403         && $jobstep->{jobstepname}
1404         && !exists $ok{$jobstep->{jobstepname}}
1405         && !exists $jobstep->{killtime})
1406     {
1407       # According to slurm, this task has ended (successfully or not)
1408       # -- but our srun child hasn't exited. First we must wait (30
1409       # seconds) in case this is just a race between communication
1410       # channels. Then, if our srun child process still hasn't
1411       # terminated, we'll conclude some slurm communication
1412       # error/delay has caused the task to die without notifying srun,
1413       # and we'll kill srun ourselves.
1414       $jobstep->{killtime} = time + 30;
1415       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1416     }
1417   }
1418 }
1419
1420
1421 sub release_allocation
1422 {
1423   if ($have_slurm)
1424   {
1425     Log (undef, "release job allocation");
1426     system "scancel $ENV{SLURM_JOB_ID}";
1427   }
1428 }
1429
1430
1431 sub readfrompipes
1432 {
1433   my $gotsome = 0;
1434   foreach my $job (keys %reader)
1435   {
1436     my $buf;
1437     if (0 < sysread ($reader{$job}, $buf, 65536))
1438     {
1439       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1440       $jobstep[$job]->{stderr_at} = time;
1441       $jobstep[$job]->{stderr} .= $buf;
1442
1443       # Consume everything up to the last \n
1444       preprocess_stderr ($job);
1445
1446       if (length ($jobstep[$job]->{stderr}) > 16384)
1447       {
1448         # If we get a lot of stderr without a newline, chop off the
1449         # front to avoid letting our buffer grow indefinitely.
1450         substr ($jobstep[$job]->{stderr},
1451                 0, length($jobstep[$job]->{stderr}) - 8192) = "";
1452       }
1453       $gotsome = 1;
1454     }
1455   }
1456   return $gotsome;
1457 }
1458
1459
1460 sub preprocess_stderr
1461 {
1462   my $job = shift;
1463
1464   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1465     my $line = $1;
1466     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1467     Log ($job, "stderr $line");
1468     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1469       # whoa.
1470       $main::please_freeze = 1;
1471     }
1472     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1473       my $job_slot_index = $jobstep[$job]->{slotindex};
1474       $slot[$job_slot_index]->{node}->{fail_count}++;
1475       $jobstep[$job]->{tempfail} = 1;
1476       ban_node_by_slot($job_slot_index);
1477     }
1478     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1479       $jobstep[$job]->{tempfail} = 1;
1480       ban_node_by_slot($jobstep[$job]->{slotindex});
1481     }
1482     elsif ($line =~ /arvados\.errors\.Keep/) {
1483       $jobstep[$job]->{tempfail} = 1;
1484     }
1485   }
1486 }
1487
1488
1489 sub process_stderr
1490 {
1491   my $job = shift;
1492   my $task_success = shift;
1493   preprocess_stderr ($job);
1494
1495   map {
1496     Log ($job, "stderr $_");
1497   } split ("\n", $jobstep[$job]->{stderr});
1498 }
1499
1500 sub fetch_block
1501 {
1502   my $hash = shift;
1503   my $keep;
1504   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1505     Log(undef, "fetch_block run error from arv-get $hash: $!");
1506     return undef;
1507   }
1508   my $output_block = "";
1509   while (1) {
1510     my $buf;
1511     my $bytes = sysread($keep, $buf, 1024 * 1024);
1512     if (!defined $bytes) {
1513       Log(undef, "fetch_block read error from arv-get: $!");
1514       $output_block = undef;
1515       last;
1516     } elsif ($bytes == 0) {
1517       # sysread returns 0 at the end of the pipe.
1518       last;
1519     } else {
1520       # some bytes were read into buf.
1521       $output_block .= $buf;
1522     }
1523   }
1524   close $keep;
1525   if ($?) {
1526     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1527     $output_block = undef;
1528   }
1529   return $output_block;
1530 }
1531
1532 # Create a collection by concatenating the output of all tasks (each
1533 # task's output is either a manifest fragment, a locator for a
1534 # manifest fragment stored in Keep, or nothing at all). Return the
1535 # portable_data_hash of the new collection.
1536 sub create_output_collection
1537 {
1538   Log (undef, "collate");
1539
1540   my ($child_out, $child_in);
1541   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1542 import arvados
1543 import sys
1544 print (arvados.api("v1").collections().
1545        create(body={"manifest_text": sys.stdin.read()}).
1546        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1547 }, retry_count());
1548
1549   my $task_idx = -1;
1550   my $manifest_size = 0;
1551   for (@jobstep)
1552   {
1553     ++$task_idx;
1554     my $output = $_->{'arvados_task'}->{output};
1555     next if (!defined($output));
1556     my $next_write;
1557     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1558       $next_write = fetch_block($output);
1559     } else {
1560       $next_write = $output;
1561     }
1562     if (defined($next_write)) {
1563       if (!defined(syswrite($child_in, $next_write))) {
1564         # There's been an error writing.  Stop the loop.
1565         # We'll log details about the exit code later.
1566         last;
1567       } else {
1568         $manifest_size += length($next_write);
1569       }
1570     } else {
1571       my $uuid = $_->{'arvados_task'}->{'uuid'};
1572       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1573       $main::success = 0;
1574     }
1575   }
1576   close($child_in);
1577   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1578
1579   my $joboutput;
1580   my $s = IO::Select->new($child_out);
1581   if ($s->can_read(120)) {
1582     sysread($child_out, $joboutput, 1024 * 1024);
1583     waitpid($pid, 0);
1584     if ($?) {
1585       Log(undef, "output collection creation exited " . exit_status_s($?));
1586       $joboutput = undef;
1587     } else {
1588       chomp($joboutput);
1589     }
1590   } else {
1591     Log (undef, "timed out while creating output collection");
1592     foreach my $signal (2, 2, 2, 15, 15, 9) {
1593       kill($signal, $pid);
1594       last if waitpid($pid, WNOHANG) == -1;
1595       sleep(1);
1596     }
1597   }
1598   close($child_out);
1599
1600   return $joboutput;
1601 }
1602
1603 # Calls create_output_collection, logs the result, and returns it.
1604 # If that was successful, save that as the output in the job record.
1605 sub save_output_collection {
1606   my $collated_output = create_output_collection();
1607
1608   if (!$collated_output) {
1609     Log(undef, "Failed to write output collection");
1610   }
1611   else {
1612     Log(undef, "job output $collated_output");
1613     $Job->update_attributes('output' => $collated_output);
1614   }
1615   return $collated_output;
1616 }
1617
1618 sub killem
1619 {
1620   foreach (@_)
1621   {
1622     my $sig = 2;                # SIGINT first
1623     if (exists $proc{$_}->{"sent_$sig"} &&
1624         time - $proc{$_}->{"sent_$sig"} > 4)
1625     {
1626       $sig = 15;                # SIGTERM if SIGINT doesn't work
1627     }
1628     if (exists $proc{$_}->{"sent_$sig"} &&
1629         time - $proc{$_}->{"sent_$sig"} > 4)
1630     {
1631       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1632     }
1633     if (!exists $proc{$_}->{"sent_$sig"})
1634     {
1635       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1636       kill $sig, $_;
1637       select (undef, undef, undef, 0.1);
1638       if ($sig == 2)
1639       {
1640         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1641       }
1642       $proc{$_}->{"sent_$sig"} = time;
1643       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1644     }
1645   }
1646 }
1647
1648
1649 sub fhbits
1650 {
1651   my($bits);
1652   for (@_) {
1653     vec($bits,fileno($_),1) = 1;
1654   }
1655   $bits;
1656 }
1657
1658
1659 # Send log output to Keep via arv-put.
1660 #
1661 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1662 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1663 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1664 # $log_pipe_pid is the pid of the arv-put subprocess.
1665 #
1666 # The only functions that should access these variables directly are:
1667 #
1668 # log_writer_start($logfilename)
1669 #     Starts an arv-put pipe, reading data on stdin and writing it to
1670 #     a $logfilename file in an output collection.
1671 #
1672 # log_writer_read_output([$timeout])
1673 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1674 #     Passes $timeout to the select() call, with a default of 0.01.
1675 #     Returns the result of the last read() call on $log_pipe_out, or
1676 #     -1 if read() wasn't called because select() timed out.
1677 #     Only other log_writer_* functions should need to call this.
1678 #
1679 # log_writer_send($txt)
1680 #     Writes $txt to the output log collection.
1681 #
1682 # log_writer_finish()
1683 #     Closes the arv-put pipe and returns the output that it produces.
1684 #
1685 # log_writer_is_active()
1686 #     Returns a true value if there is currently a live arv-put
1687 #     process, false otherwise.
1688 #
1689 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1690     $log_pipe_pid);
1691
1692 sub log_writer_start($)
1693 {
1694   my $logfilename = shift;
1695   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1696                         'arv-put',
1697                         '--stream',
1698                         '--retries', '3',
1699                         '--filename', $logfilename,
1700                         '-');
1701   $log_pipe_out_buf = "";
1702   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1703 }
1704
1705 sub log_writer_read_output {
1706   my $timeout = shift || 0.01;
1707   my $read = -1;
1708   while ($read && $log_pipe_out_select->can_read($timeout)) {
1709     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1710                  length($log_pipe_out_buf));
1711   }
1712   if (!defined($read)) {
1713     Log(undef, "error reading log manifest from arv-put: $!");
1714   }
1715   return $read;
1716 }
1717
1718 sub log_writer_send($)
1719 {
1720   my $txt = shift;
1721   print $log_pipe_in $txt;
1722   log_writer_read_output();
1723 }
1724
1725 sub log_writer_finish()
1726 {
1727   return unless $log_pipe_pid;
1728
1729   close($log_pipe_in);
1730
1731   my $logger_failed = 0;
1732   my $read_result = log_writer_read_output(120);
1733   if ($read_result == -1) {
1734     $logger_failed = -1;
1735     Log (undef, "timed out reading from 'arv-put'");
1736   } elsif ($read_result != 0) {
1737     $logger_failed = -2;
1738     Log(undef, "failed to read arv-put log manifest to EOF");
1739   }
1740
1741   waitpid($log_pipe_pid, 0);
1742   if ($?) {
1743     $logger_failed ||= $?;
1744     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1745   }
1746
1747   close($log_pipe_out);
1748   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1749   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1750       $log_pipe_out_select = undef;
1751
1752   return $arv_put_output;
1753 }
1754
1755 sub log_writer_is_active() {
1756   return $log_pipe_pid;
1757 }
1758
1759 sub Log                         # ($jobstep_id, $logmessage)
1760 {
1761   if ($_[1] =~ /\n/) {
1762     for my $line (split (/\n/, $_[1])) {
1763       Log ($_[0], $line);
1764     }
1765     return;
1766   }
1767   my $fh = select STDERR; $|=1; select $fh;
1768   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1769   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1770   $message .= "\n";
1771   my $datetime;
1772   if (log_writer_is_active() || -t STDERR) {
1773     my @gmtime = gmtime;
1774     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1775                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1776   }
1777   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1778
1779   if (log_writer_is_active()) {
1780     log_writer_send($datetime . " " . $message);
1781   }
1782 }
1783
1784
1785 sub croak
1786 {
1787   my ($package, $file, $line) = caller;
1788   my $message = "@_ at $file line $line\n";
1789   Log (undef, $message);
1790   freeze() if @jobstep_todo;
1791   create_output_collection() if @jobstep_todo;
1792   cleanup();
1793   save_meta();
1794   die;
1795 }
1796
1797
1798 sub cleanup
1799 {
1800   return unless $Job;
1801   if ($Job->{'state'} eq 'Cancelled') {
1802     $Job->update_attributes('finished_at' => scalar gmtime);
1803   } else {
1804     $Job->update_attributes('state' => 'Failed');
1805   }
1806 }
1807
1808
1809 sub save_meta
1810 {
1811   my $justcheckpoint = shift; # false if this will be the last meta saved
1812   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1813   return unless log_writer_is_active();
1814   my $log_manifest = log_writer_finish();
1815   return unless defined($log_manifest);
1816
1817   if ($Job->{log}) {
1818     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1819     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1820   }
1821
1822   my $log_coll = api_call(
1823     "collections/create", ensure_unique_name => 1, collection => {
1824       manifest_text => $log_manifest,
1825       owner_uuid => $Job->{owner_uuid},
1826       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1827     });
1828   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1829   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1830 }
1831
1832
1833 sub freeze_if_want_freeze
1834 {
1835   if ($main::please_freeze)
1836   {
1837     release_allocation();
1838     if (@_)
1839     {
1840       # kill some srun procs before freeze+stop
1841       map { $proc{$_} = {} } @_;
1842       while (%proc)
1843       {
1844         killem (keys %proc);
1845         select (undef, undef, undef, 0.1);
1846         my $died;
1847         while (($died = waitpid (-1, WNOHANG)) > 0)
1848         {
1849           delete $proc{$died};
1850         }
1851       }
1852     }
1853     freeze();
1854     create_output_collection();
1855     cleanup();
1856     save_meta();
1857     exit 1;
1858   }
1859 }
1860
1861
1862 sub freeze
1863 {
1864   Log (undef, "Freeze not implemented");
1865   return;
1866 }
1867
1868
1869 sub thaw
1870 {
1871   croak ("Thaw not implemented");
1872 }
1873
1874
1875 sub freezequote
1876 {
1877   my $s = shift;
1878   $s =~ s/\\/\\\\/g;
1879   $s =~ s/\n/\\n/g;
1880   return $s;
1881 }
1882
1883
1884 sub freezeunquote
1885 {
1886   my $s = shift;
1887   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1888   return $s;
1889 }
1890
1891
1892 sub srun
1893 {
1894   my $srunargs = shift;
1895   my $execargs = shift;
1896   my $opts = shift || {};
1897   my $stdin = shift;
1898   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1899
1900   $Data::Dumper::Terse = 1;
1901   $Data::Dumper::Indent = 0;
1902   my $show_cmd = Dumper($args);
1903   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1904   $show_cmd =~ s/\n/ /g;
1905   if ($opts->{fork}) {
1906     Log(undef, "starting: $show_cmd");
1907   } else {
1908     # This is a child process: parent is in charge of reading our
1909     # stderr and copying it to Log() if needed.
1910     warn "starting: $show_cmd\n";
1911   }
1912
1913   if (defined $stdin) {
1914     my $child = open STDIN, "-|";
1915     defined $child or die "no fork: $!";
1916     if ($child == 0) {
1917       print $stdin or die $!;
1918       close STDOUT or die $!;
1919       exit 0;
1920     }
1921   }
1922
1923   return system (@$args) if $opts->{fork};
1924
1925   exec @$args;
1926   warn "ENV size is ".length(join(" ",%ENV));
1927   die "exec failed: $!: @$args";
1928 }
1929
1930
1931 sub ban_node_by_slot {
1932   # Don't start any new jobsteps on this node for 60 seconds
1933   my $slotid = shift;
1934   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1935   $slot[$slotid]->{node}->{hold_count}++;
1936   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1937 }
1938
1939 sub must_lock_now
1940 {
1941   my ($lockfile, $error_message) = @_;
1942   open L, ">", $lockfile or croak("$lockfile: $!");
1943   if (!flock L, LOCK_EX|LOCK_NB) {
1944     croak("Can't lock $lockfile: $error_message\n");
1945   }
1946 }
1947
1948 sub find_docker_image {
1949   # Given a Keep locator, check to see if it contains a Docker image.
1950   # If so, return its stream name and Docker hash.
1951   # If not, return undef for both values.
1952   my $locator = shift;
1953   my ($streamname, $filename);
1954   my $image = api_call("collections/get", uuid => $locator);
1955   if ($image) {
1956     foreach my $line (split(/\n/, $image->{manifest_text})) {
1957       my @tokens = split(/\s+/, $line);
1958       next if (!@tokens);
1959       $streamname = shift(@tokens);
1960       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1961         if (defined($filename)) {
1962           return (undef, undef);  # More than one file in the Collection.
1963         } else {
1964           $filename = (split(/:/, $filedata, 3))[2];
1965         }
1966       }
1967     }
1968   }
1969   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1970     return ($streamname, $1);
1971   } else {
1972     return (undef, undef);
1973   }
1974 }
1975
1976 sub retry_count {
1977   # Calculate the number of times an operation should be retried,
1978   # assuming exponential backoff, and that we're willing to retry as
1979   # long as tasks have been running.  Enforce a minimum of 3 retries.
1980   my ($starttime, $endtime, $timediff, $retries);
1981   if (@jobstep) {
1982     $starttime = $jobstep[0]->{starttime};
1983     $endtime = $jobstep[-1]->{finishtime};
1984   }
1985   if (!defined($starttime)) {
1986     $timediff = 0;
1987   } elsif (!defined($endtime)) {
1988     $timediff = time - $starttime;
1989   } else {
1990     $timediff = ($endtime - $starttime) - (time - $endtime);
1991   }
1992   if ($timediff > 0) {
1993     $retries = int(log($timediff) / log(2));
1994   } else {
1995     $retries = 1;  # Use the minimum.
1996   }
1997   return ($retries > 3) ? $retries : 3;
1998 }
1999
2000 sub retry_op {
2001   # Pass in two function references.
2002   # This method will be called with the remaining arguments.
2003   # If it dies, retry it with exponential backoff until it succeeds,
2004   # or until the current retry_count is exhausted.  After each failure
2005   # that can be retried, the second function will be called with
2006   # the current try count (0-based), next try time, and error message.
2007   my $operation = shift;
2008   my $retry_callback = shift;
2009   my $retries = retry_count();
2010   foreach my $try_count (0..$retries) {
2011     my $next_try = time + (2 ** $try_count);
2012     my $result = eval { $operation->(@_); };
2013     if (!$@) {
2014       return $result;
2015     } elsif ($try_count < $retries) {
2016       $retry_callback->($try_count, $next_try, $@);
2017       my $sleep_time = $next_try - time;
2018       sleep($sleep_time) if ($sleep_time > 0);
2019     }
2020   }
2021   # Ensure the error message ends in a newline, so Perl doesn't add
2022   # retry_op's line number to it.
2023   chomp($@);
2024   die($@ . "\n");
2025 }
2026
2027 sub api_call {
2028   # Pass in a /-separated API method name, and arguments for it.
2029   # This function will call that method, retrying as needed until
2030   # the current retry_count is exhausted, with a log on the first failure.
2031   my $method_name = shift;
2032   my $log_api_retry = sub {
2033     my ($try_count, $next_try_at, $errmsg) = @_;
2034     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2035     $errmsg =~ s/\s/ /g;
2036     $errmsg =~ s/\s+$//;
2037     my $retry_msg;
2038     if ($next_try_at < time) {
2039       $retry_msg = "Retrying.";
2040     } else {
2041       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2042       $retry_msg = "Retrying at $next_try_fmt.";
2043     }
2044     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2045   };
2046   my $method = $arv;
2047   foreach my $key (split(/\//, $method_name)) {
2048     $method = $method->{$key};
2049   }
2050   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2051 }
2052
2053 sub exit_status_s {
2054   # Given a $?, return a human-readable exit code string like "0" or
2055   # "1" or "0 with signal 1" or "1 with signal 11".
2056   my $exitcode = shift;
2057   my $s = $exitcode >> 8;
2058   if ($exitcode & 0x7f) {
2059     $s .= " with signal " . ($exitcode & 0x7f);
2060   }
2061   if ($exitcode & 0x80) {
2062     $s .= " with core dump";
2063   }
2064   return $s;
2065 }
2066
2067 sub handle_readall {
2068   # Pass in a glob reference to a file handle.
2069   # Read all its contents and return them as a string.
2070   my $fh_glob_ref = shift;
2071   local $/ = undef;
2072   return <$fh_glob_ref>;
2073 }
2074
2075 sub tar_filename_n {
2076   my $n = shift;
2077   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2078 }
2079
2080 sub add_git_archive {
2081   # Pass in a git archive command as a string or list, a la system().
2082   # This method will save its output to be included in the archive sent to the
2083   # build script.
2084   my $git_input;
2085   $git_tar_count++;
2086   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2087     croak("Failed to save git archive: $!");
2088   }
2089   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2090   close($git_input);
2091   waitpid($git_pid, 0);
2092   close(GIT_ARCHIVE);
2093   if ($?) {
2094     croak("Failed to save git archive: git exited " . exit_status_s($?));
2095   }
2096 }
2097
2098 sub combined_git_archive {
2099   # Combine all saved tar archives into a single archive, then return its
2100   # contents in a string.  Return undef if no archives have been saved.
2101   if ($git_tar_count < 1) {
2102     return undef;
2103   }
2104   my $base_tar_name = tar_filename_n(1);
2105   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2106     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2107     if ($tar_exit != 0) {
2108       croak("Error preparing build archive: tar -A exited " .
2109             exit_status_s($tar_exit));
2110     }
2111   }
2112   if (!open(GIT_TAR, "<", $base_tar_name)) {
2113     croak("Could not open build archive: $!");
2114   }
2115   my $tar_contents = handle_readall(\*GIT_TAR);
2116   close(GIT_TAR);
2117   return $tar_contents;
2118 }
2119
2120 sub set_nonblocking {
2121   my $fh = shift;
2122   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2123   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2124 }
2125
2126 __DATA__
2127 #!/usr/bin/env perl
2128 #
2129 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2130 # server invokes this script on individual compute nodes, or localhost if we're
2131 # running a job locally.  It gets called in two modes:
2132 #
2133 # * No arguments: Installation mode.  Read a tar archive from the DATA
2134 #   file handle; it includes the Crunch script's source code, and
2135 #   maybe SDKs as well.  Those should be installed in the proper
2136 #   locations.  This runs outside of any Docker container, so don't try to
2137 #   introspect Crunch's runtime environment.
2138 #
2139 # * With arguments: Crunch script run mode.  This script should set up the
2140 #   environment, then run the command specified in the arguments.  This runs
2141 #   inside any Docker container.
2142
2143 use Fcntl ':flock';
2144 use File::Path qw( make_path remove_tree );
2145 use POSIX qw(getcwd);
2146
2147 use constant TASK_TEMPFAIL => 111;
2148
2149 # Map SDK subdirectories to the path environments they belong to.
2150 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2151
2152 my $destdir = $ENV{"CRUNCH_SRC"};
2153 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2154 my $repo = $ENV{"CRUNCH_SRC_URL"};
2155 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2156 my $job_work = $ENV{"JOB_WORK"};
2157 my $task_work = $ENV{"TASK_WORK"};
2158
2159 open(STDOUT_ORIG, ">&", STDOUT);
2160 open(STDERR_ORIG, ">&", STDERR);
2161
2162 for my $dir ($destdir, $job_work, $task_work) {
2163   if ($dir) {
2164     make_path $dir;
2165     -e $dir or die "Failed to create temporary directory ($dir): $!";
2166   }
2167 }
2168
2169 if ($task_work) {
2170   remove_tree($task_work, {keep_root => 1});
2171 }
2172
2173 ### Crunch script run mode
2174 if (@ARGV) {
2175   # We want to do routine logging during task 0 only.  This gives the user
2176   # the information they need, but avoids repeating the information for every
2177   # task.
2178   my $Log;
2179   if ($ENV{TASK_SEQUENCE} eq "0") {
2180     $Log = sub {
2181       my $msg = shift;
2182       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2183     };
2184   } else {
2185     $Log = sub { };
2186   }
2187
2188   my $python_src = "$install_dir/python";
2189   my $venv_dir = "$job_work/.arvados.venv";
2190   my $venv_built = -e "$venv_dir/bin/activate";
2191   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2192     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2193                  "--python=python2.7", $venv_dir);
2194     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2195     $venv_built = 1;
2196     $Log->("Built Python SDK virtualenv");
2197   }
2198
2199   my @pysdk_version_cmd = ("python", "-c",
2200     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2201   if ($venv_built) {
2202     $Log->("Running in Python SDK virtualenv");
2203     @pysdk_version_cmd = ();
2204     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2205     @ARGV = ("/bin/sh", "-ec",
2206              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2207   } elsif (-d $python_src) {
2208     $Log->("Warning: virtualenv not found inside Docker container default " .
2209            "\$PATH. Can't install Python SDK.");
2210   }
2211
2212   if (@pysdk_version_cmd) {
2213     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2214     my $pysdk_version = <$pysdk_version_pipe>;
2215     close($pysdk_version_pipe);
2216     if ($? == 0) {
2217       chomp($pysdk_version);
2218       $Log->("Using Arvados SDK version $pysdk_version");
2219     } else {
2220       # A lot could've gone wrong here, but pretty much all of it means that
2221       # Python won't be able to load the Arvados SDK.
2222       $Log->("Warning: Arvados SDK not found");
2223     }
2224   }
2225
2226   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2227     my $sdk_path = "$install_dir/$sdk_dir";
2228     if (-d $sdk_path) {
2229       if ($ENV{$sdk_envkey}) {
2230         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2231       } else {
2232         $ENV{$sdk_envkey} = $sdk_path;
2233       }
2234       $Log->("Arvados SDK added to %s", $sdk_envkey);
2235     }
2236   }
2237
2238   exec(@ARGV);
2239   die "Cannot exec `@ARGV`: $!";
2240 }
2241
2242 ### Installation mode
2243 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2244 flock L, LOCK_EX;
2245 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2246   # This exact git archive (source + arvados sdk) is already installed
2247   # here, so there's no need to reinstall it.
2248
2249   # We must consume our DATA section, though: otherwise the process
2250   # feeding it to us will get SIGPIPE.
2251   my $buf;
2252   while (read(DATA, $buf, 65536)) { }
2253
2254   exit(0);
2255 }
2256
2257 unlink "$destdir.archive_hash";
2258 mkdir $destdir;
2259
2260 do {
2261   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2262   local $SIG{PIPE} = "IGNORE";
2263   warn "Extracting archive: $archive_hash\n";
2264   # --ignore-zeros is necessary sometimes: depending on how much NUL
2265   # padding tar -A put on our combined archive (which in turn depends
2266   # on the length of the component archives) tar without
2267   # --ignore-zeros will exit before consuming stdin and cause close()
2268   # to fail on the resulting SIGPIPE.
2269   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2270     die "Error launching 'tar -xC $destdir': $!";
2271   }
2272   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2273   # get SIGPIPE.  We must feed it data incrementally.
2274   my $tar_input;
2275   while (read(DATA, $tar_input, 65536)) {
2276     print TARX $tar_input;
2277   }
2278   if(!close(TARX)) {
2279     die "'tar -xC $destdir' exited $?: $!";
2280   }
2281 };
2282
2283 mkdir $install_dir;
2284
2285 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2286 if (-d $sdk_root) {
2287   foreach my $sdk_lang (("python",
2288                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2289     if (-d "$sdk_root/$sdk_lang") {
2290       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2291         die "Failed to install $sdk_lang SDK: $!";
2292       }
2293     }
2294   }
2295 }
2296
2297 my $python_dir = "$install_dir/python";
2298 if ((-d $python_dir) and can_run("python2.7")) {
2299   open(my $egg_info_pipe, "-|",
2300        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2301   my @egg_info_errors = <$egg_info_pipe>;
2302   close($egg_info_pipe);
2303
2304   if ($?) {
2305     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2306       # egg_info apparently failed because it couldn't ask git for a build tag.
2307       # Specify no build tag.
2308       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2309       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2310       close($pysdk_cfg);
2311     } else {
2312       my $egg_info_exit = $? >> 8;
2313       foreach my $errline (@egg_info_errors) {
2314         warn $errline;
2315       }
2316       warn "python setup.py egg_info failed: exit $egg_info_exit";
2317       exit ($egg_info_exit || 1);
2318     }
2319   }
2320 }
2321
2322 # Hide messages from the install script (unless it fails: shell_or_die
2323 # will show $destdir.log in that case).
2324 open(STDOUT, ">>", "$destdir.log");
2325 open(STDERR, ">&", STDOUT);
2326
2327 if (-e "$destdir/crunch_scripts/install") {
2328     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2329 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2330     # Old version
2331     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2332 } elsif (-e "./install.sh") {
2333     shell_or_die (undef, "./install.sh", $install_dir);
2334 }
2335
2336 if ($archive_hash) {
2337     unlink "$destdir.archive_hash.new";
2338     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2339     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2340 }
2341
2342 close L;
2343
2344 sub can_run {
2345   my $command_name = shift;
2346   open(my $which, "-|", "which", $command_name);
2347   while (<$which>) { }
2348   close($which);
2349   return ($? == 0);
2350 }
2351
2352 sub shell_or_die
2353 {
2354   my $exitcode = shift;
2355
2356   if ($ENV{"DEBUG"}) {
2357     print STDERR "@_\n";
2358   }
2359   if (system (@_) != 0) {
2360     my $err = $!;
2361     my $code = $?;
2362     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2363     open STDERR, ">&STDERR_ORIG";
2364     system ("cat $destdir.log >&2");
2365     warn "@_ failed ($err): $exitstatus";
2366     if (defined($exitcode)) {
2367       exit $exitcode;
2368     }
2369     else {
2370       exit (($code >> 8) || 1);
2371     }
2372   }
2373 }
2374
2375 __DATA__