8284: Fix confusion between %proc and %jobstep.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
187      $cmd,
188      {fork => 1});
189 if ($? != 0) {
190   Log(undef, "Sanity check failed: ".exit_status_s($?));
191   exit EX_TEMPFAIL;
192 }
193 Log(undef, "Sanity check OK");
194
195
196 my $User = api_call("users/current");
197
198 if (!$local_job) {
199   if (!$force_unlock) {
200     # Claim this job, and make sure nobody else does
201     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
202     if ($@) {
203       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
204       exit EX_TEMPFAIL;
205     };
206   }
207 }
208 else
209 {
210   if (!$resume_stash)
211   {
212     map { croak ("No $_ specified") unless $local_job->{$_} }
213     qw(script script_version script_parameters);
214   }
215
216   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217   $local_job->{'started_at'} = gmtime;
218   $local_job->{'state'} = 'Running';
219
220   $Job = api_call("jobs/create", job => $local_job);
221 }
222 $job_id = $Job->{'uuid'};
223
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
226
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
230
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
232 if ($? == 0) {
233   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234   chomp($gem_versions);
235   chop($gem_versions);  # Closing parentheses
236 } else {
237   $gem_versions = "";
238 }
239 Log(undef,
240     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
241
242 Log (undef, "check slurm allocation");
243 my @slot;
244 my @node;
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
246 my @sinfo;
247 if (!$have_slurm)
248 {
249   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250   push @sinfo, "$localcpus localhost";
251 }
252 if (exists $ENV{SLURM_NODELIST})
253 {
254   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
255 }
256 foreach (@sinfo)
257 {
258   my ($ncpus, $slurm_nodelist) = split;
259   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260
261   my @nodelist;
262   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263   {
264     my $nodelist = $1;
265     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266     {
267       my $ranges = $1;
268       foreach (split (",", $ranges))
269       {
270         my ($a, $b);
271         if (/(\d+)-(\d+)/)
272         {
273           $a = $1;
274           $b = $2;
275         }
276         else
277         {
278           $a = $_;
279           $b = $_;
280         }
281         push @nodelist, map {
282           my $n = $nodelist;
283           $n =~ s/\[[-,\d]+\]/$_/;
284           $n;
285         } ($a..$b);
286       }
287     }
288     else
289     {
290       push @nodelist, $nodelist;
291     }
292   }
293   foreach my $nodename (@nodelist)
294   {
295     Log (undef, "node $nodename - $ncpus slots");
296     my $node = { name => $nodename,
297                  ncpus => $ncpus,
298                  # The number of consecutive times a task has been dispatched
299                  # to this node and failed.
300                  losing_streak => 0,
301                  # The number of consecutive times that SLURM has reported
302                  # a node failure since the last successful task.
303                  fail_count => 0,
304                  # Don't dispatch work to this node until this time
305                  # (in seconds since the epoch) has passed.
306                  hold_until => 0 };
307     foreach my $cpu (1..$ncpus)
308     {
309       push @slot, { node => $node,
310                     cpu => $cpu };
311     }
312   }
313   push @node, @nodelist;
314 }
315
316
317
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
320
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322
323
324 $Job->update_attributes(
325   'tasks_summary' => { 'failed' => 0,
326                        'todo' => 1,
327                        'running' => 0,
328                        'done' => 0 });
329
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
338
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
344
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
348
349
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
356
357
358
359 if (defined $Job->{thawedfromkey})
360 {
361   thaw ($Job->{thawedfromkey});
362 }
363 else
364 {
365   my $first_task = api_call("job_tasks/create", job_task => {
366     'job_uuid' => $Job->{'uuid'},
367     'sequence' => 0,
368     'qsequence' => 0,
369     'parameters' => {},
370   });
371   push @jobstep, { 'level' => 0,
372                    'failures' => 0,
373                    'arvados_task' => $first_task,
374                  };
375   push @jobstep_todo, 0;
376 }
377
378
379 if (!$have_slurm)
380 {
381   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 }
383
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
387
388 if (!defined $no_clear_tmp) {
389   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390   Log (undef, "Clean work dirs");
391
392   my $cleanpid = fork();
393   if ($cleanpid == 0)
394   {
395     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396     # Then clean up work directories.
397     # TODO: When #5036 is done and widely deployed, we can limit mount's
398     # -t option to simply fuse.keep.
399     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
401     exit (1);
402   }
403   while (1)
404   {
405     last if $cleanpid == waitpid (-1, WNOHANG);
406     freeze_if_want_freeze ($cleanpid);
407     select (undef, undef, undef, 0.1);
408   }
409   if ($?) {
410     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419   if (!$docker_hash)
420   {
421     croak("No Docker image hash found from locator $docker_locator");
422   }
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
426     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
427 fi
428 };
429   my $docker_pid = fork();
430   if ($docker_pid == 0)
431   {
432     srun (["srun", "--nodelist=" . join(',', @node)],
433           ["/bin/sh", "-ec", $docker_install_script]);
434     exit ($?);
435   }
436   while (1)
437   {
438     last if $docker_pid == waitpid (-1, WNOHANG);
439     freeze_if_want_freeze ($docker_pid);
440     select (undef, undef, undef, 0.1);
441   }
442   if ($? != 0)
443   {
444     croak("Installing Docker image from $docker_locator exited "
445           .exit_status_s($?));
446   }
447
448   # Determine whether this version of Docker supports memory+swap limits.
449   srun(["srun", "--nodelist=" . $node[0]],
450        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
451       {fork => 1});
452   $docker_limitmem = ($? == 0);
453
454   # Find a non-root Docker user to use.
455   # Tries the default user for the container, then 'crunch', then 'nobody',
456   # testing for whether the actual user id is non-zero.  This defends against
457   # mistakes but not malice, but we intend to harden the security in the future
458   # so we don't want anyone getting used to their jobs running as root in their
459   # Docker containers.
460   my @tryusers = ("", "crunch", "nobody");
461   foreach my $try_user (@tryusers) {
462     my $try_user_arg;
463     if ($try_user eq "") {
464       Log(undef, "Checking if container default user is not UID 0");
465       $try_user_arg = "";
466     } else {
467       Log(undef, "Checking if user '$try_user' is not UID 0");
468       $try_user_arg = "--user=$try_user";
469     }
470     srun(["srun", "--nodelist=" . $node[0]],
471          ["/bin/sh", "-ec",
472           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
473           " test \$a -ne 0"],
474          {fork => 1});
475     if ($? == 0) {
476       $dockeruserarg = $try_user_arg;
477       if ($try_user eq "") {
478         Log(undef, "Container will run with default user");
479       } else {
480         Log(undef, "Container will run with $dockeruserarg");
481       }
482       last;
483     }
484   }
485
486   if (!defined $dockeruserarg) {
487     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
488   }
489
490   if ($Job->{arvados_sdk_version}) {
491     # The job also specifies an Arvados SDK version.  Add the SDKs to the
492     # tar file for the build script to install.
493     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
494                        $Job->{arvados_sdk_version}));
495     add_git_archive("git", "--git-dir=$git_dir", "archive",
496                     "--prefix=.arvados.sdk/",
497                     $Job->{arvados_sdk_version}, "sdk");
498   }
499 }
500
501 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
502   # If script_version looks like an absolute path, *and* the --git-dir
503   # argument was not given -- which implies we were not invoked by
504   # crunch-dispatch -- we will use the given path as a working
505   # directory instead of resolving script_version to a git commit (or
506   # doing anything else with git).
507   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
508   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
509 }
510 else {
511   # Resolve the given script_version to a git commit sha1. Also, if
512   # the repository is remote, clone it into our local filesystem: this
513   # ensures "git archive" will work, and is necessary to reliably
514   # resolve a symbolic script_version like "master^".
515   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
516
517   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
520
521   # If we're running under crunch-dispatch, it will have already
522   # pulled the appropriate source tree into its own repository, and
523   # given us that repo's path as $git_dir.
524   #
525   # If we're running a "local" job, we might have to fetch content
526   # from a remote repository.
527   #
528   # (Currently crunch-dispatch gives a local path with --git-dir, but
529   # we might as well accept URLs there too in case it changes its
530   # mind.)
531   my $repo = $git_dir || $Job->{'repository'};
532
533   # Repository can be remote or local. If remote, we'll need to fetch it
534   # to a local dir before doing `git log` et al.
535   my $repo_location;
536
537   if ($repo =~ m{://|^[^/]*:}) {
538     # $repo is a git url we can clone, like git:// or https:// or
539     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
540     # not recognized here because distinguishing that from a local
541     # path is too fragile. If you really need something strange here,
542     # use the ssh:// form.
543     $repo_location = 'remote';
544   } elsif ($repo =~ m{^\.*/}) {
545     # $repo is a local path to a git index. We'll also resolve ../foo
546     # to ../foo/.git if the latter is a directory. To help
547     # disambiguate local paths from named hosted repositories, this
548     # form must be given as ./ or ../ if it's a relative path.
549     if (-d "$repo/.git") {
550       $repo = "$repo/.git";
551     }
552     $repo_location = 'local';
553   } else {
554     # $repo is none of the above. It must be the name of a hosted
555     # repository.
556     my $arv_repo_list = api_call("repositories/list",
557                                  'filters' => [['name','=',$repo]]);
558     my @repos_found = @{$arv_repo_list->{'items'}};
559     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
560     if ($n_found > 0) {
561       Log(undef, "Repository '$repo' -> "
562           . join(", ", map { $_->{'uuid'} } @repos_found));
563     }
564     if ($n_found != 1) {
565       croak("Error: Found $n_found repositories with name '$repo'.");
566     }
567     $repo = $repos_found[0]->{'fetch_url'};
568     $repo_location = 'remote';
569   }
570   Log(undef, "Using $repo_location repository '$repo'");
571   $ENV{"CRUNCH_SRC_URL"} = $repo;
572
573   # Resolve given script_version (we'll call that $treeish here) to a
574   # commit sha1 ($commit).
575   my $treeish = $Job->{'script_version'};
576   my $commit;
577   if ($repo_location eq 'remote') {
578     # We minimize excess object-fetching by re-using the same bare
579     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
580     # just keep adding remotes to it as needed.
581     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
582     my $gitcmd = "git --git-dir=\Q$local_repo\E";
583
584     # Set up our local repo for caching remote objects, making
585     # archives, etc.
586     if (!-d $local_repo) {
587       make_path($local_repo) or croak("Error: could not create $local_repo");
588     }
589     # This works (exits 0 and doesn't delete fetched objects) even
590     # if $local_repo is already initialized:
591     `$gitcmd init --bare`;
592     if ($?) {
593       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
594     }
595
596     # If $treeish looks like a hash (or abbrev hash) we look it up in
597     # our local cache first, since that's cheaper. (We don't want to
598     # do that with tags/branches though -- those change over time, so
599     # they should always be resolved by the remote repo.)
600     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
601       # Hide stderr because it's normal for this to fail:
602       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
603       if ($? == 0 &&
604           # Careful not to resolve a branch named abcdeff to commit 1234567:
605           $sha1 =~ /^$treeish/ &&
606           $sha1 =~ /^([0-9a-f]{40})$/s) {
607         $commit = $1;
608         Log(undef, "Commit $commit already present in $local_repo");
609       }
610     }
611
612     if (!defined $commit) {
613       # If $treeish isn't just a hash or abbrev hash, or isn't here
614       # yet, we need to fetch the remote to resolve it correctly.
615
616       # First, remove all local heads. This prevents a name that does
617       # not exist on the remote from resolving to (or colliding with)
618       # a previously fetched branch or tag (possibly from a different
619       # remote).
620       remove_tree("$local_repo/refs/heads", {keep_root => 1});
621
622       Log(undef, "Fetching objects from $repo to $local_repo");
623       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
624       if ($?) {
625         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626       }
627     }
628
629     # Now that the data is all here, we will use our local repo for
630     # the rest of our git activities.
631     $repo = $local_repo;
632   }
633
634   my $gitcmd = "git --git-dir=\Q$repo\E";
635   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
636   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
637     croak("`$gitcmd rev-list` exited "
638           .exit_status_s($?)
639           .", '$treeish' not found, giving up");
640   }
641   $commit = $1;
642   Log(undef, "Version $treeish is commit $commit");
643
644   if ($commit ne $Job->{'script_version'}) {
645     # Record the real commit id in the database, frozentokey, logs,
646     # etc. -- instead of an abbreviation or a branch name which can
647     # become ambiguous or point to a different commit in the future.
648     if (!$Job->update_attributes('script_version' => $commit)) {
649       croak("Error: failed to update job's script_version attribute");
650     }
651   }
652
653   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
654   add_git_archive("$gitcmd archive ''\Q$commit\E");
655 }
656
657 my $git_archive = combined_git_archive();
658 if (!defined $git_archive) {
659   Log(undef, "Skip install phase (no git archive)");
660   if ($have_slurm) {
661     Log(undef, "Warning: This probably means workers have no source tree!");
662   }
663 }
664 else {
665   my $install_exited;
666   my $install_script_tries_left = 3;
667   for (my $attempts = 0; $attempts < 3; $attempts++) {
668     Log(undef, "Run install script on all workers");
669
670     my @srunargs = ("srun",
671                     "--nodelist=$nodelist",
672                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
673     my @execargs = ("sh", "-c",
674                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
675
676     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
677     my ($install_stderr_r, $install_stderr_w);
678     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
679     set_nonblocking($install_stderr_r);
680     my $installpid = fork();
681     if ($installpid == 0)
682     {
683       close($install_stderr_r);
684       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
685       open(STDOUT, ">&", $install_stderr_w);
686       open(STDERR, ">&", $install_stderr_w);
687       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
688       exit (1);
689     }
690     close($install_stderr_w);
691     # Tell freeze_if_want_freeze how to kill the child, otherwise the
692     # "waitpid(installpid)" loop won't get interrupted by a freeze:
693     $proc{$installpid} = {};
694     my $stderr_buf = '';
695     # Track whether anything appears on stderr other than slurm errors
696     # ("srun: ...") and the "starting: ..." message printed by the
697     # srun subroutine itself:
698     my $stderr_anything_from_script = 0;
699     my $match_our_own_errors = '^(srun: error: |starting: \[)';
700     while ($installpid != waitpid(-1, WNOHANG)) {
701       freeze_if_want_freeze ($installpid);
702       # Wait up to 0.1 seconds for something to appear on stderr, then
703       # do a non-blocking read.
704       my $bits = fhbits($install_stderr_r);
705       select ($bits, undef, $bits, 0.1);
706       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
707       {
708         while ($stderr_buf =~ /^(.*?)\n/) {
709           my $line = $1;
710           substr $stderr_buf, 0, 1+length($line), "";
711           Log(undef, "stderr $line");
712           if ($line !~ /$match_our_own_errors/) {
713             $stderr_anything_from_script = 1;
714           }
715         }
716       }
717     }
718     delete $proc{$installpid};
719     $install_exited = $?;
720     close($install_stderr_r);
721     if (length($stderr_buf) > 0) {
722       if ($stderr_buf !~ /$match_our_own_errors/) {
723         $stderr_anything_from_script = 1;
724       }
725       Log(undef, "stderr $stderr_buf")
726     }
727
728     Log (undef, "Install script exited ".exit_status_s($install_exited));
729     last if $install_exited == 0 || $main::please_freeze;
730     # If the install script fails but doesn't print an error message,
731     # the next thing anyone is likely to do is just run it again in
732     # case it was a transient problem like "slurm communication fails
733     # because the network isn't reliable enough". So we'll just do
734     # that ourselves (up to 3 attempts in total). OTOH, if there is an
735     # error message, the problem is more likely to have a real fix and
736     # we should fail the job so the fixing process can start, instead
737     # of doing 2 more attempts.
738     last if $stderr_anything_from_script;
739   }
740
741   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
742     unlink($tar_filename);
743   }
744
745   if ($install_exited != 0) {
746     croak("Giving up");
747   }
748 }
749
750 foreach (qw (script script_version script_parameters runtime_constraints))
751 {
752   Log (undef,
753        "$_ " .
754        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
755 }
756 foreach (split (/\n/, $Job->{knobs}))
757 {
758   Log (undef, "knob " . $_);
759 }
760
761
762
763 $main::success = undef;
764
765
766
767 ONELEVEL:
768
769 my $thisround_succeeded = 0;
770 my $thisround_failed = 0;
771 my $thisround_failed_multiple = 0;
772 my $working_slot_count = scalar(@slot);
773
774 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
775                        or $a <=> $b } @jobstep_todo;
776 my $level = $jobstep[$jobstep_todo[0]]->{level};
777
778 my $initial_tasks_this_level = 0;
779 foreach my $id (@jobstep_todo) {
780   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
781 }
782
783 # If the number of tasks scheduled at this level #T is smaller than the number
784 # of slots available #S, only use the first #T slots, or the first slot on
785 # each node, whichever number is greater.
786 #
787 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
788 # based on these numbers.  Using fewer slots makes more resources available
789 # to each individual task, which should normally be a better strategy when
790 # there are fewer of them running with less parallelism.
791 #
792 # Note that this calculation is not redone if the initial tasks at
793 # this level queue more tasks at the same level.  This may harm
794 # overall task throughput for that level.
795 my @freeslot;
796 if ($initial_tasks_this_level < @node) {
797   @freeslot = (0..$#node);
798 } elsif ($initial_tasks_this_level < @slot) {
799   @freeslot = (0..$initial_tasks_this_level - 1);
800 } else {
801   @freeslot = (0..$#slot);
802 }
803 my $round_num_freeslots = scalar(@freeslot);
804
805 my %round_max_slots = ();
806 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
807   my $this_slot = $slot[$freeslot[$ii]];
808   my $node_name = $this_slot->{node}->{name};
809   $round_max_slots{$node_name} ||= $this_slot->{cpu};
810   last if (scalar(keys(%round_max_slots)) >= @node);
811 }
812
813 Log(undef, "start level $level with $round_num_freeslots slots");
814 my @holdslot;
815 my %reader;
816 my $progress_is_dirty = 1;
817 my $progress_stats_updated = 0;
818
819 update_progress_stats();
820
821
822 THISROUND:
823 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
824 {
825   # Don't create new tasks if we already know the job's final result.
826   last if defined($main::success);
827
828   my $id = $jobstep_todo[$todo_ptr];
829   my $Jobstep = $jobstep[$id];
830   if ($Jobstep->{level} != $level)
831   {
832     next;
833   }
834
835   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
836   set_nonblocking($reader{$id});
837
838   my $childslot = $freeslot[0];
839   my $childnode = $slot[$childslot]->{node};
840   my $childslotname = join (".",
841                             $slot[$childslot]->{node}->{name},
842                             $slot[$childslot]->{cpu});
843
844   my $childpid = fork();
845   if ($childpid == 0)
846   {
847     $SIG{'INT'} = 'DEFAULT';
848     $SIG{'QUIT'} = 'DEFAULT';
849     $SIG{'TERM'} = 'DEFAULT';
850
851     foreach (values (%reader))
852     {
853       close($_);
854     }
855     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
856     open(STDOUT,">&writer");
857     open(STDERR,">&writer");
858
859     undef $dbh;
860     undef $sth;
861
862     delete $ENV{"GNUPGHOME"};
863     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
864     $ENV{"TASK_QSEQUENCE"} = $id;
865     $ENV{"TASK_SEQUENCE"} = $level;
866     $ENV{"JOB_SCRIPT"} = $Job->{script};
867     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
868       $param =~ tr/a-z/A-Z/;
869       $ENV{"JOB_PARAMETER_$param"} = $value;
870     }
871     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
872     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
873     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
874     $ENV{"HOME"} = $ENV{"TASK_WORK"};
875     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
876     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
877     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
878
879     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
880
881     $ENV{"GZIP"} = "-n";
882
883     my @srunargs = (
884       "srun",
885       "--nodelist=".$childnode->{name},
886       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
887       "--job-name=$job_id.$id.$$",
888         );
889
890     my $stdbuf = " stdbuf --output=0 --error=0 ";
891
892     my $arv_file_cache = "";
893     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
894       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
895     }
896
897     my $command =
898         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
899         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
900         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
901         # These environment variables get used explicitly later in
902         # $command.  No tool is expected to read these values directly.
903         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
904         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
905         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
906         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
907
908     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
909     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
910     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
911
912     if ($docker_hash)
913     {
914       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
915       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
916       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
917       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
918       # We only set memory limits if Docker lets us limit both memory and swap.
919       # Memory limits alone have been supported longer, but subprocesses tend
920       # to get SIGKILL if they exceed that without any swap limit set.
921       # See #5642 for additional background.
922       if ($docker_limitmem) {
923         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
924       }
925
926       # The source tree and $destdir directory (which we have
927       # installed on the worker host) are available in the container,
928       # under the same path.
929       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
930       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
931
932       # Currently, we make the "by_pdh" directory in arv-mount's mount
933       # point appear at /keep inside the container (instead of using
934       # the same path as the host like we do with CRUNCH_SRC and
935       # CRUNCH_INSTALL). However, crunch scripts and utilities must
936       # not rely on this. They must use $TASK_KEEPMOUNT.
937       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
938       $ENV{TASK_KEEPMOUNT} = "/keep";
939
940       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
941       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
942       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
943
944       # TASK_WORK is almost exactly like a docker data volume: it
945       # starts out empty, is writable, and persists until no
946       # containers use it any more. We don't use --volumes-from to
947       # share it with other containers: it is only accessible to this
948       # task, and it goes away when this task stops.
949       #
950       # However, a docker data volume is writable only by root unless
951       # the mount point already happens to exist in the container with
952       # different permissions. Therefore, we [1] assume /tmp already
953       # exists in the image and is writable by the crunch user; [2]
954       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
955       # writable if they are created by docker while setting up the
956       # other --volumes); and [3] create $TASK_WORK inside the
957       # container using $build_script.
958       $command .= "--volume=/tmp ";
959       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
960       $ENV{"HOME"} = $ENV{"TASK_WORK"};
961       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
962
963       # TODO: Share a single JOB_WORK volume across all task
964       # containers on a given worker node, and delete it when the job
965       # ends (and, in case that doesn't work, when the next job
966       # starts).
967       #
968       # For now, use the same approach as TASK_WORK above.
969       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
970
971       while (my ($env_key, $env_val) = each %ENV)
972       {
973         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
974           $command .= "--env=\Q$env_key=$env_val\E ";
975         }
976       }
977       $command .= "--env=\QHOME=$ENV{HOME}\E ";
978       $command .= "\Q$docker_hash\E ";
979
980       if ($Job->{arvados_sdk_version}) {
981         $command .= $stdbuf;
982         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
983       } else {
984         $command .= "/bin/sh -c \'python -c " .
985             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
986             ">&2 2>/dev/null; " .
987             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
988             "if which stdbuf >/dev/null ; then " .
989             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
990             " else " .
991             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
992             " fi\'";
993       }
994     } else {
995       # Non-docker run
996       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
997       $command .= $stdbuf;
998       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
999     }
1000
1001     my @execargs = ('bash', '-c', $command);
1002     srun (\@srunargs, \@execargs, undef, $build_script);
1003     # exec() failed, we assume nothing happened.
1004     die "srun() failed on build script\n";
1005   }
1006   close("writer");
1007   if (!defined $childpid)
1008   {
1009     close $reader{$id};
1010     delete $reader{$id};
1011     next;
1012   }
1013   shift @freeslot;
1014   $proc{$childpid} = { jobstep => $id,
1015                        time => time,
1016                        slot => $childslot,
1017                        jobstepname => "$job_id.$id.$childpid",
1018                      };
1019   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1020   $slot[$childslot]->{pid} = $childpid;
1021
1022   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1023   Log ($id, "child $childpid started on $childslotname");
1024   $Jobstep->{starttime} = time;
1025   $Jobstep->{node} = $childnode->{name};
1026   $Jobstep->{slotindex} = $childslot;
1027   delete $Jobstep->{stderr};
1028   delete $Jobstep->{finishtime};
1029   delete $Jobstep->{tempfail};
1030
1031   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1032   $Jobstep->{'arvados_task'}->save;
1033
1034   splice @jobstep_todo, $todo_ptr, 1;
1035   --$todo_ptr;
1036
1037   $progress_is_dirty = 1;
1038
1039   while (!@freeslot
1040          ||
1041          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1042   {
1043     last THISROUND if $main::please_freeze;
1044     if ($main::please_info)
1045     {
1046       $main::please_info = 0;
1047       freeze();
1048       create_output_collection();
1049       save_meta(1);
1050       update_progress_stats();
1051     }
1052     my $gotsome
1053         = readfrompipes ()
1054         + reapchildren ();
1055     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1056     {
1057       check_refresh_wanted();
1058       check_squeue();
1059       update_progress_stats();
1060       select (undef, undef, undef, 0.1);
1061     }
1062     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1063     {
1064       update_progress_stats();
1065     }
1066     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1067                                         $_->{node}->{hold_count} < 4 } @slot);
1068     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1069         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1070     {
1071       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1072           .($thisround_failed+$thisround_succeeded)
1073           .") -- giving up on this round";
1074       Log (undef, $message);
1075       last THISROUND;
1076     }
1077
1078     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1079     for (my $i=$#freeslot; $i>=0; $i--) {
1080       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1081         push @holdslot, (splice @freeslot, $i, 1);
1082       }
1083     }
1084     for (my $i=$#holdslot; $i>=0; $i--) {
1085       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1086         push @freeslot, (splice @holdslot, $i, 1);
1087       }
1088     }
1089
1090     # give up if no nodes are succeeding
1091     if ($working_slot_count < 1) {
1092       Log(undef, "Every node has failed -- giving up");
1093       last THISROUND;
1094     }
1095   }
1096 }
1097
1098
1099 push @freeslot, splice @holdslot;
1100 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1101
1102
1103 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1104 while (%proc)
1105 {
1106   if ($main::please_continue) {
1107     $main::please_continue = 0;
1108     goto THISROUND;
1109   }
1110   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1111   readfrompipes ();
1112   if (!reapchildren())
1113   {
1114     check_refresh_wanted();
1115     check_squeue();
1116     update_progress_stats();
1117     select (undef, undef, undef, 0.1);
1118     killem (keys %proc) if $main::please_freeze;
1119   }
1120 }
1121
1122 update_progress_stats();
1123 freeze_if_want_freeze();
1124
1125
1126 if (!defined $main::success)
1127 {
1128   if (!@jobstep_todo) {
1129     $main::success = 1;
1130   } elsif ($working_slot_count < 1) {
1131     save_output_collection();
1132     save_meta();
1133     exit(EX_RETRY_UNLOCKED);
1134   } elsif ($thisround_succeeded == 0 &&
1135            ($thisround_failed == 0 || $thisround_failed > 4)) {
1136     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1137     Log (undef, $message);
1138     $main::success = 0;
1139   }
1140 }
1141
1142 goto ONELEVEL if !defined $main::success;
1143
1144
1145 release_allocation();
1146 freeze();
1147 my $collated_output = save_output_collection();
1148 Log (undef, "finish");
1149
1150 save_meta();
1151
1152 my $final_state;
1153 if ($collated_output && $main::success) {
1154   $final_state = 'Complete';
1155 } else {
1156   $final_state = 'Failed';
1157 }
1158 $Job->update_attributes('state' => $final_state);
1159
1160 exit (($final_state eq 'Complete') ? 0 : 1);
1161
1162
1163
1164 sub update_progress_stats
1165 {
1166   $progress_stats_updated = time;
1167   return if !$progress_is_dirty;
1168   my ($todo, $done, $running) = (scalar @jobstep_todo,
1169                                  scalar @jobstep_done,
1170                                  scalar keys(%proc));
1171   $Job->{'tasks_summary'} ||= {};
1172   $Job->{'tasks_summary'}->{'todo'} = $todo;
1173   $Job->{'tasks_summary'}->{'done'} = $done;
1174   $Job->{'tasks_summary'}->{'running'} = $running;
1175   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1176   Log (undef, "status: $done done, $running running, $todo todo");
1177   $progress_is_dirty = 0;
1178 }
1179
1180
1181
1182 sub reapchildren
1183 {
1184   my $pid = waitpid (-1, WNOHANG);
1185   return 0 if $pid <= 0;
1186
1187   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1188                   . "."
1189                   . $slot[$proc{$pid}->{slot}]->{cpu});
1190   my $jobstepid = $proc{$pid}->{jobstep};
1191   my $elapsed = time - $proc{$pid}->{time};
1192   my $Jobstep = $jobstep[$jobstepid];
1193
1194   my $childstatus = $?;
1195   my $exitvalue = $childstatus >> 8;
1196   my $exitinfo = "exit ".exit_status_s($childstatus);
1197   $Jobstep->{'arvados_task'}->reload;
1198   my $task_success = $Jobstep->{'arvados_task'}->{success};
1199
1200   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1201
1202   if (!defined $task_success) {
1203     # task did not indicate one way or the other --> fail
1204     Log($jobstepid, sprintf(
1205           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1206           exit_status_s($childstatus)));
1207     $Jobstep->{'arvados_task'}->{success} = 0;
1208     $Jobstep->{'arvados_task'}->save;
1209     $task_success = 0;
1210   }
1211
1212   if (!$task_success)
1213   {
1214     my $temporary_fail;
1215     $temporary_fail ||= $Jobstep->{tempfail};
1216     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1217
1218     ++$thisround_failed;
1219     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1220
1221     # Check for signs of a failed or misconfigured node
1222     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1223         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1224       # Don't count this against jobstep failure thresholds if this
1225       # node is already suspected faulty and srun exited quickly
1226       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1227           $elapsed < 5) {
1228         Log ($jobstepid, "blaming failure on suspect node " .
1229              $slot[$proc{$pid}->{slot}]->{node}->{name});
1230         $temporary_fail ||= 1;
1231       }
1232       ban_node_by_slot($proc{$pid}->{slot});
1233     }
1234
1235     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1236                              ++$Jobstep->{'failures'},
1237                              $temporary_fail ? 'temporary' : 'permanent',
1238                              $elapsed));
1239
1240     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1241       # Give up on this task, and the whole job
1242       $main::success = 0;
1243     }
1244     # Put this task back on the todo queue
1245     push @jobstep_todo, $jobstepid;
1246     $Job->{'tasks_summary'}->{'failed'}++;
1247   }
1248   else
1249   {
1250     ++$thisround_succeeded;
1251     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1252     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1253     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1254     push @jobstep_done, $jobstepid;
1255     Log ($jobstepid, "success in $elapsed seconds");
1256   }
1257   $Jobstep->{exitcode} = $childstatus;
1258   $Jobstep->{finishtime} = time;
1259   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1260   $Jobstep->{'arvados_task'}->save;
1261   process_stderr ($jobstepid, $task_success);
1262   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1263                            length($Jobstep->{'arvados_task'}->{output}),
1264                            $Jobstep->{'arvados_task'}->{output}));
1265
1266   close $reader{$jobstepid};
1267   delete $reader{$jobstepid};
1268   delete $slot[$proc{$pid}->{slot}]->{pid};
1269   push @freeslot, $proc{$pid}->{slot};
1270   delete $proc{$pid};
1271
1272   if ($task_success) {
1273     # Load new tasks
1274     my $newtask_list = [];
1275     my $newtask_results;
1276     do {
1277       $newtask_results = api_call(
1278         "job_tasks/list",
1279         'where' => {
1280           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1281         },
1282         'order' => 'qsequence',
1283         'offset' => scalar(@$newtask_list),
1284       );
1285       push(@$newtask_list, @{$newtask_results->{items}});
1286     } while (@{$newtask_results->{items}});
1287     foreach my $arvados_task (@$newtask_list) {
1288       my $jobstep = {
1289         'level' => $arvados_task->{'sequence'},
1290         'failures' => 0,
1291         'arvados_task' => $arvados_task
1292       };
1293       push @jobstep, $jobstep;
1294       push @jobstep_todo, $#jobstep;
1295     }
1296   }
1297
1298   $progress_is_dirty = 1;
1299   1;
1300 }
1301
1302 sub check_refresh_wanted
1303 {
1304   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1305   if (@stat && $stat[9] > $latest_refresh) {
1306     $latest_refresh = scalar time;
1307     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1308     for my $attr ('cancelled_at',
1309                   'cancelled_by_user_uuid',
1310                   'cancelled_by_client_uuid',
1311                   'state') {
1312       $Job->{$attr} = $Job2->{$attr};
1313     }
1314     if ($Job->{'state'} ne "Running") {
1315       if ($Job->{'state'} eq "Cancelled") {
1316         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1317       } else {
1318         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1319       }
1320       $main::success = 0;
1321       $main::please_freeze = 1;
1322     }
1323   }
1324 }
1325
1326 sub check_squeue
1327 {
1328   my $last_squeue_check = $squeue_checked;
1329
1330   # Do not call `squeue` or check the kill list more than once every
1331   # 15 seconds.
1332   return if $last_squeue_check > time - 15;
1333   $squeue_checked = time;
1334
1335   # Look for children from which we haven't received stderr data since
1336   # the last squeue check. If no such children exist, all procs are
1337   # alive and there's no need to even look at squeue.
1338   #
1339   # As long as the crunchstat poll interval (10s) is shorter than the
1340   # squeue check interval (15s) this should make the squeue check an
1341   # infrequent event.
1342   my $silent_procs = 0;
1343   for my $procinfo (values %proc)
1344   {
1345     my $jobstep = $jobstep[$procinfo->{jobstep}];
1346     if ($jobstep->{stderr_at} < $last_squeue_check)
1347     {
1348       $silent_procs++;
1349     }
1350   }
1351   return if $silent_procs == 0;
1352
1353   # use killem() on procs whose killtime is reached
1354   while (my ($pid, $procinfo) = each %proc)
1355   {
1356     my $jobstep = $jobstep[$procinfo->{jobstep}];
1357     if (exists $procinfo->{killtime}
1358         && $procinfo->{killtime} <= time
1359         && $jobstep->{stderr_at} < $last_squeue_check)
1360     {
1361       my $sincewhen = "";
1362       if ($jobstep->{stderr_at}) {
1363         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1364       }
1365       Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1366       killem ($pid);
1367     }
1368   }
1369
1370   if (!$have_slurm)
1371   {
1372     # here is an opportunity to check for mysterious problems with local procs
1373     return;
1374   }
1375
1376   # Get a list of steps still running.  Note: squeue(1) says --steps
1377   # selects a format (which we override anyway) and allows us to
1378   # specify which steps we're interested in (which we don't).
1379   # Importantly, it also changes the meaning of %j from "job name" to
1380   # "step name" and (although this isn't mentioned explicitly in the
1381   # docs) switches from "one line per job" mode to "one line per step"
1382   # mode. Without it, we'd just get a list of one job, instead of a
1383   # list of N steps.
1384   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1385   if ($? != 0)
1386   {
1387     Log(undef, "warning: squeue exit status $? ($!)");
1388     return;
1389   }
1390   chop @squeue;
1391
1392   # which of my jobsteps are running, according to squeue?
1393   my %ok;
1394   for my $jobstepname (@squeue)
1395   {
1396     $ok{$jobstepname} = 1;
1397   }
1398
1399   # Check for child procs >60s old and not mentioned by squeue.
1400   while (my ($pid, $procinfo) = each %proc)
1401   {
1402     if ($procinfo->{time} < time - 60
1403         && $procinfo->{jobstepname}
1404         && !exists $ok{$procinfo->{jobstepname}}
1405         && !exists $procinfo->{killtime})
1406     {
1407       # According to slurm, this task has ended (successfully or not)
1408       # -- but our srun child hasn't exited. First we must wait (30
1409       # seconds) in case this is just a race between communication
1410       # channels. Then, if our srun child process still hasn't
1411       # terminated, we'll conclude some slurm communication
1412       # error/delay has caused the task to die without notifying srun,
1413       # and we'll kill srun ourselves.
1414       $procinfo->{killtime} = time + 30;
1415       Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1416     }
1417   }
1418 }
1419
1420
1421 sub release_allocation
1422 {
1423   if ($have_slurm)
1424   {
1425     Log (undef, "release job allocation");
1426     system "scancel $ENV{SLURM_JOB_ID}";
1427   }
1428 }
1429
1430
1431 sub readfrompipes
1432 {
1433   my $gotsome = 0;
1434   foreach my $job (keys %reader)
1435   {
1436     my $buf;
1437     while (0 < sysread ($reader{$job}, $buf, 8192))
1438     {
1439       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1440       $jobstep[$job]->{stderr_at} = time;
1441       $jobstep[$job]->{stderr} .= $buf;
1442       preprocess_stderr ($job);
1443       if (length ($jobstep[$job]->{stderr}) > 16384)
1444       {
1445         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1446       }
1447       $gotsome = 1;
1448     }
1449   }
1450   return $gotsome;
1451 }
1452
1453
1454 sub preprocess_stderr
1455 {
1456   my $job = shift;
1457
1458   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1459     my $line = $1;
1460     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1461     Log ($job, "stderr $line");
1462     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1463       # whoa.
1464       $main::please_freeze = 1;
1465     }
1466     elsif ($line =~ /srun: error: Node failure on/) {
1467       my $job_slot_index = $jobstep[$job]->{slotindex};
1468       $slot[$job_slot_index]->{node}->{fail_count}++;
1469       $jobstep[$job]->{tempfail} = 1;
1470       ban_node_by_slot($job_slot_index);
1471     }
1472     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1473       $jobstep[$job]->{tempfail} = 1;
1474       ban_node_by_slot($jobstep[$job]->{slotindex});
1475     }
1476     elsif ($line =~ /arvados\.errors\.Keep/) {
1477       $jobstep[$job]->{tempfail} = 1;
1478     }
1479   }
1480 }
1481
1482
1483 sub process_stderr
1484 {
1485   my $job = shift;
1486   my $task_success = shift;
1487   preprocess_stderr ($job);
1488
1489   map {
1490     Log ($job, "stderr $_");
1491   } split ("\n", $jobstep[$job]->{stderr});
1492 }
1493
1494 sub fetch_block
1495 {
1496   my $hash = shift;
1497   my $keep;
1498   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1499     Log(undef, "fetch_block run error from arv-get $hash: $!");
1500     return undef;
1501   }
1502   my $output_block = "";
1503   while (1) {
1504     my $buf;
1505     my $bytes = sysread($keep, $buf, 1024 * 1024);
1506     if (!defined $bytes) {
1507       Log(undef, "fetch_block read error from arv-get: $!");
1508       $output_block = undef;
1509       last;
1510     } elsif ($bytes == 0) {
1511       # sysread returns 0 at the end of the pipe.
1512       last;
1513     } else {
1514       # some bytes were read into buf.
1515       $output_block .= $buf;
1516     }
1517   }
1518   close $keep;
1519   if ($?) {
1520     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1521     $output_block = undef;
1522   }
1523   return $output_block;
1524 }
1525
1526 # Create a collection by concatenating the output of all tasks (each
1527 # task's output is either a manifest fragment, a locator for a
1528 # manifest fragment stored in Keep, or nothing at all). Return the
1529 # portable_data_hash of the new collection.
1530 sub create_output_collection
1531 {
1532   Log (undef, "collate");
1533
1534   my ($child_out, $child_in);
1535   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1536 import arvados
1537 import sys
1538 print (arvados.api("v1").collections().
1539        create(body={"manifest_text": sys.stdin.read()}).
1540        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1541 }, retry_count());
1542
1543   my $task_idx = -1;
1544   my $manifest_size = 0;
1545   for (@jobstep)
1546   {
1547     ++$task_idx;
1548     my $output = $_->{'arvados_task'}->{output};
1549     next if (!defined($output));
1550     my $next_write;
1551     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1552       $next_write = fetch_block($output);
1553     } else {
1554       $next_write = $output;
1555     }
1556     if (defined($next_write)) {
1557       if (!defined(syswrite($child_in, $next_write))) {
1558         # There's been an error writing.  Stop the loop.
1559         # We'll log details about the exit code later.
1560         last;
1561       } else {
1562         $manifest_size += length($next_write);
1563       }
1564     } else {
1565       my $uuid = $_->{'arvados_task'}->{'uuid'};
1566       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1567       $main::success = 0;
1568     }
1569   }
1570   close($child_in);
1571   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1572
1573   my $joboutput;
1574   my $s = IO::Select->new($child_out);
1575   if ($s->can_read(120)) {
1576     sysread($child_out, $joboutput, 1024 * 1024);
1577     waitpid($pid, 0);
1578     if ($?) {
1579       Log(undef, "output collection creation exited " . exit_status_s($?));
1580       $joboutput = undef;
1581     } else {
1582       chomp($joboutput);
1583     }
1584   } else {
1585     Log (undef, "timed out while creating output collection");
1586     foreach my $signal (2, 2, 2, 15, 15, 9) {
1587       kill($signal, $pid);
1588       last if waitpid($pid, WNOHANG) == -1;
1589       sleep(1);
1590     }
1591   }
1592   close($child_out);
1593
1594   return $joboutput;
1595 }
1596
1597 # Calls create_output_collection, logs the result, and returns it.
1598 # If that was successful, save that as the output in the job record.
1599 sub save_output_collection {
1600   my $collated_output = create_output_collection();
1601
1602   if (!$collated_output) {
1603     Log(undef, "Failed to write output collection");
1604   }
1605   else {
1606     Log(undef, "job output $collated_output");
1607     $Job->update_attributes('output' => $collated_output);
1608   }
1609   return $collated_output;
1610 }
1611
1612 sub killem
1613 {
1614   foreach (@_)
1615   {
1616     my $sig = 2;                # SIGINT first
1617     if (exists $proc{$_}->{"sent_$sig"} &&
1618         time - $proc{$_}->{"sent_$sig"} > 4)
1619     {
1620       $sig = 15;                # SIGTERM if SIGINT doesn't work
1621     }
1622     if (exists $proc{$_}->{"sent_$sig"} &&
1623         time - $proc{$_}->{"sent_$sig"} > 4)
1624     {
1625       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1626     }
1627     if (!exists $proc{$_}->{"sent_$sig"})
1628     {
1629       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1630       kill $sig, $_;
1631       select (undef, undef, undef, 0.1);
1632       if ($sig == 2)
1633       {
1634         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1635       }
1636       $proc{$_}->{"sent_$sig"} = time;
1637       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1638     }
1639   }
1640 }
1641
1642
1643 sub fhbits
1644 {
1645   my($bits);
1646   for (@_) {
1647     vec($bits,fileno($_),1) = 1;
1648   }
1649   $bits;
1650 }
1651
1652
1653 # Send log output to Keep via arv-put.
1654 #
1655 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1656 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1657 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1658 # $log_pipe_pid is the pid of the arv-put subprocess.
1659 #
1660 # The only functions that should access these variables directly are:
1661 #
1662 # log_writer_start($logfilename)
1663 #     Starts an arv-put pipe, reading data on stdin and writing it to
1664 #     a $logfilename file in an output collection.
1665 #
1666 # log_writer_read_output([$timeout])
1667 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1668 #     Passes $timeout to the select() call, with a default of 0.01.
1669 #     Returns the result of the last read() call on $log_pipe_out, or
1670 #     -1 if read() wasn't called because select() timed out.
1671 #     Only other log_writer_* functions should need to call this.
1672 #
1673 # log_writer_send($txt)
1674 #     Writes $txt to the output log collection.
1675 #
1676 # log_writer_finish()
1677 #     Closes the arv-put pipe and returns the output that it produces.
1678 #
1679 # log_writer_is_active()
1680 #     Returns a true value if there is currently a live arv-put
1681 #     process, false otherwise.
1682 #
1683 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1684     $log_pipe_pid);
1685
1686 sub log_writer_start($)
1687 {
1688   my $logfilename = shift;
1689   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1690                         'arv-put',
1691                         '--stream',
1692                         '--retries', '3',
1693                         '--filename', $logfilename,
1694                         '-');
1695   $log_pipe_out_buf = "";
1696   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1697 }
1698
1699 sub log_writer_read_output {
1700   my $timeout = shift || 0.01;
1701   my $read = -1;
1702   while ($read && $log_pipe_out_select->can_read($timeout)) {
1703     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1704                  length($log_pipe_out_buf));
1705   }
1706   if (!defined($read)) {
1707     Log(undef, "error reading log manifest from arv-put: $!");
1708   }
1709   return $read;
1710 }
1711
1712 sub log_writer_send($)
1713 {
1714   my $txt = shift;
1715   print $log_pipe_in $txt;
1716   log_writer_read_output();
1717 }
1718
1719 sub log_writer_finish()
1720 {
1721   return unless $log_pipe_pid;
1722
1723   close($log_pipe_in);
1724
1725   my $logger_failed = 0;
1726   my $read_result = log_writer_read_output(120);
1727   if ($read_result == -1) {
1728     $logger_failed = -1;
1729     Log (undef, "timed out reading from 'arv-put'");
1730   } elsif ($read_result != 0) {
1731     $logger_failed = -2;
1732     Log(undef, "failed to read arv-put log manifest to EOF");
1733   }
1734
1735   waitpid($log_pipe_pid, 0);
1736   if ($?) {
1737     $logger_failed ||= $?;
1738     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1739   }
1740
1741   close($log_pipe_out);
1742   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1743   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1744       $log_pipe_out_select = undef;
1745
1746   return $arv_put_output;
1747 }
1748
1749 sub log_writer_is_active() {
1750   return $log_pipe_pid;
1751 }
1752
1753 sub Log                         # ($jobstep_id, $logmessage)
1754 {
1755   if ($_[1] =~ /\n/) {
1756     for my $line (split (/\n/, $_[1])) {
1757       Log ($_[0], $line);
1758     }
1759     return;
1760   }
1761   my $fh = select STDERR; $|=1; select $fh;
1762   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1763   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1764   $message .= "\n";
1765   my $datetime;
1766   if (log_writer_is_active() || -t STDERR) {
1767     my @gmtime = gmtime;
1768     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1769                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1770   }
1771   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1772
1773   if (log_writer_is_active()) {
1774     log_writer_send($datetime . " " . $message);
1775   }
1776 }
1777
1778
1779 sub croak
1780 {
1781   my ($package, $file, $line) = caller;
1782   my $message = "@_ at $file line $line\n";
1783   Log (undef, $message);
1784   freeze() if @jobstep_todo;
1785   create_output_collection() if @jobstep_todo;
1786   cleanup();
1787   save_meta();
1788   die;
1789 }
1790
1791
1792 sub cleanup
1793 {
1794   return unless $Job;
1795   if ($Job->{'state'} eq 'Cancelled') {
1796     $Job->update_attributes('finished_at' => scalar gmtime);
1797   } else {
1798     $Job->update_attributes('state' => 'Failed');
1799   }
1800 }
1801
1802
1803 sub save_meta
1804 {
1805   my $justcheckpoint = shift; # false if this will be the last meta saved
1806   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1807   return unless log_writer_is_active();
1808   my $log_manifest = log_writer_finish();
1809   return unless defined($log_manifest);
1810
1811   if ($Job->{log}) {
1812     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1813     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1814   }
1815
1816   my $log_coll = api_call(
1817     "collections/create", ensure_unique_name => 1, collection => {
1818       manifest_text => $log_manifest,
1819       owner_uuid => $Job->{owner_uuid},
1820       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1821     });
1822   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1823   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1824 }
1825
1826
1827 sub freeze_if_want_freeze
1828 {
1829   if ($main::please_freeze)
1830   {
1831     release_allocation();
1832     if (@_)
1833     {
1834       # kill some srun procs before freeze+stop
1835       map { $proc{$_} = {} } @_;
1836       while (%proc)
1837       {
1838         killem (keys %proc);
1839         select (undef, undef, undef, 0.1);
1840         my $died;
1841         while (($died = waitpid (-1, WNOHANG)) > 0)
1842         {
1843           delete $proc{$died};
1844         }
1845       }
1846     }
1847     freeze();
1848     create_output_collection();
1849     cleanup();
1850     save_meta();
1851     exit 1;
1852   }
1853 }
1854
1855
1856 sub freeze
1857 {
1858   Log (undef, "Freeze not implemented");
1859   return;
1860 }
1861
1862
1863 sub thaw
1864 {
1865   croak ("Thaw not implemented");
1866 }
1867
1868
1869 sub freezequote
1870 {
1871   my $s = shift;
1872   $s =~ s/\\/\\\\/g;
1873   $s =~ s/\n/\\n/g;
1874   return $s;
1875 }
1876
1877
1878 sub freezeunquote
1879 {
1880   my $s = shift;
1881   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1882   return $s;
1883 }
1884
1885
1886 sub srun
1887 {
1888   my $srunargs = shift;
1889   my $execargs = shift;
1890   my $opts = shift || {};
1891   my $stdin = shift;
1892   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1893
1894   $Data::Dumper::Terse = 1;
1895   $Data::Dumper::Indent = 0;
1896   my $show_cmd = Dumper($args);
1897   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1898   $show_cmd =~ s/\n/ /g;
1899   if ($opts->{fork}) {
1900     Log(undef, "starting: $show_cmd");
1901   } else {
1902     # This is a child process: parent is in charge of reading our
1903     # stderr and copying it to Log() if needed.
1904     warn "starting: $show_cmd\n";
1905   }
1906
1907   if (defined $stdin) {
1908     my $child = open STDIN, "-|";
1909     defined $child or die "no fork: $!";
1910     if ($child == 0) {
1911       print $stdin or die $!;
1912       close STDOUT or die $!;
1913       exit 0;
1914     }
1915   }
1916
1917   return system (@$args) if $opts->{fork};
1918
1919   exec @$args;
1920   warn "ENV size is ".length(join(" ",%ENV));
1921   die "exec failed: $!: @$args";
1922 }
1923
1924
1925 sub ban_node_by_slot {
1926   # Don't start any new jobsteps on this node for 60 seconds
1927   my $slotid = shift;
1928   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1929   $slot[$slotid]->{node}->{hold_count}++;
1930   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1931 }
1932
1933 sub must_lock_now
1934 {
1935   my ($lockfile, $error_message) = @_;
1936   open L, ">", $lockfile or croak("$lockfile: $!");
1937   if (!flock L, LOCK_EX|LOCK_NB) {
1938     croak("Can't lock $lockfile: $error_message\n");
1939   }
1940 }
1941
1942 sub find_docker_image {
1943   # Given a Keep locator, check to see if it contains a Docker image.
1944   # If so, return its stream name and Docker hash.
1945   # If not, return undef for both values.
1946   my $locator = shift;
1947   my ($streamname, $filename);
1948   my $image = api_call("collections/get", uuid => $locator);
1949   if ($image) {
1950     foreach my $line (split(/\n/, $image->{manifest_text})) {
1951       my @tokens = split(/\s+/, $line);
1952       next if (!@tokens);
1953       $streamname = shift(@tokens);
1954       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1955         if (defined($filename)) {
1956           return (undef, undef);  # More than one file in the Collection.
1957         } else {
1958           $filename = (split(/:/, $filedata, 3))[2];
1959         }
1960       }
1961     }
1962   }
1963   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1964     return ($streamname, $1);
1965   } else {
1966     return (undef, undef);
1967   }
1968 }
1969
1970 sub retry_count {
1971   # Calculate the number of times an operation should be retried,
1972   # assuming exponential backoff, and that we're willing to retry as
1973   # long as tasks have been running.  Enforce a minimum of 3 retries.
1974   my ($starttime, $endtime, $timediff, $retries);
1975   if (@jobstep) {
1976     $starttime = $jobstep[0]->{starttime};
1977     $endtime = $jobstep[-1]->{finishtime};
1978   }
1979   if (!defined($starttime)) {
1980     $timediff = 0;
1981   } elsif (!defined($endtime)) {
1982     $timediff = time - $starttime;
1983   } else {
1984     $timediff = ($endtime - $starttime) - (time - $endtime);
1985   }
1986   if ($timediff > 0) {
1987     $retries = int(log($timediff) / log(2));
1988   } else {
1989     $retries = 1;  # Use the minimum.
1990   }
1991   return ($retries > 3) ? $retries : 3;
1992 }
1993
1994 sub retry_op {
1995   # Pass in two function references.
1996   # This method will be called with the remaining arguments.
1997   # If it dies, retry it with exponential backoff until it succeeds,
1998   # or until the current retry_count is exhausted.  After each failure
1999   # that can be retried, the second function will be called with
2000   # the current try count (0-based), next try time, and error message.
2001   my $operation = shift;
2002   my $retry_callback = shift;
2003   my $retries = retry_count();
2004   foreach my $try_count (0..$retries) {
2005     my $next_try = time + (2 ** $try_count);
2006     my $result = eval { $operation->(@_); };
2007     if (!$@) {
2008       return $result;
2009     } elsif ($try_count < $retries) {
2010       $retry_callback->($try_count, $next_try, $@);
2011       my $sleep_time = $next_try - time;
2012       sleep($sleep_time) if ($sleep_time > 0);
2013     }
2014   }
2015   # Ensure the error message ends in a newline, so Perl doesn't add
2016   # retry_op's line number to it.
2017   chomp($@);
2018   die($@ . "\n");
2019 }
2020
2021 sub api_call {
2022   # Pass in a /-separated API method name, and arguments for it.
2023   # This function will call that method, retrying as needed until
2024   # the current retry_count is exhausted, with a log on the first failure.
2025   my $method_name = shift;
2026   my $log_api_retry = sub {
2027     my ($try_count, $next_try_at, $errmsg) = @_;
2028     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2029     $errmsg =~ s/\s/ /g;
2030     $errmsg =~ s/\s+$//;
2031     my $retry_msg;
2032     if ($next_try_at < time) {
2033       $retry_msg = "Retrying.";
2034     } else {
2035       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2036       $retry_msg = "Retrying at $next_try_fmt.";
2037     }
2038     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2039   };
2040   my $method = $arv;
2041   foreach my $key (split(/\//, $method_name)) {
2042     $method = $method->{$key};
2043   }
2044   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2045 }
2046
2047 sub exit_status_s {
2048   # Given a $?, return a human-readable exit code string like "0" or
2049   # "1" or "0 with signal 1" or "1 with signal 11".
2050   my $exitcode = shift;
2051   my $s = $exitcode >> 8;
2052   if ($exitcode & 0x7f) {
2053     $s .= " with signal " . ($exitcode & 0x7f);
2054   }
2055   if ($exitcode & 0x80) {
2056     $s .= " with core dump";
2057   }
2058   return $s;
2059 }
2060
2061 sub handle_readall {
2062   # Pass in a glob reference to a file handle.
2063   # Read all its contents and return them as a string.
2064   my $fh_glob_ref = shift;
2065   local $/ = undef;
2066   return <$fh_glob_ref>;
2067 }
2068
2069 sub tar_filename_n {
2070   my $n = shift;
2071   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2072 }
2073
2074 sub add_git_archive {
2075   # Pass in a git archive command as a string or list, a la system().
2076   # This method will save its output to be included in the archive sent to the
2077   # build script.
2078   my $git_input;
2079   $git_tar_count++;
2080   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2081     croak("Failed to save git archive: $!");
2082   }
2083   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2084   close($git_input);
2085   waitpid($git_pid, 0);
2086   close(GIT_ARCHIVE);
2087   if ($?) {
2088     croak("Failed to save git archive: git exited " . exit_status_s($?));
2089   }
2090 }
2091
2092 sub combined_git_archive {
2093   # Combine all saved tar archives into a single archive, then return its
2094   # contents in a string.  Return undef if no archives have been saved.
2095   if ($git_tar_count < 1) {
2096     return undef;
2097   }
2098   my $base_tar_name = tar_filename_n(1);
2099   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2100     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2101     if ($tar_exit != 0) {
2102       croak("Error preparing build archive: tar -A exited " .
2103             exit_status_s($tar_exit));
2104     }
2105   }
2106   if (!open(GIT_TAR, "<", $base_tar_name)) {
2107     croak("Could not open build archive: $!");
2108   }
2109   my $tar_contents = handle_readall(\*GIT_TAR);
2110   close(GIT_TAR);
2111   return $tar_contents;
2112 }
2113
2114 sub set_nonblocking {
2115   my $fh = shift;
2116   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2117   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2118 }
2119
2120 __DATA__
2121 #!/usr/bin/env perl
2122 #
2123 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2124 # server invokes this script on individual compute nodes, or localhost if we're
2125 # running a job locally.  It gets called in two modes:
2126 #
2127 # * No arguments: Installation mode.  Read a tar archive from the DATA
2128 #   file handle; it includes the Crunch script's source code, and
2129 #   maybe SDKs as well.  Those should be installed in the proper
2130 #   locations.  This runs outside of any Docker container, so don't try to
2131 #   introspect Crunch's runtime environment.
2132 #
2133 # * With arguments: Crunch script run mode.  This script should set up the
2134 #   environment, then run the command specified in the arguments.  This runs
2135 #   inside any Docker container.
2136
2137 use Fcntl ':flock';
2138 use File::Path qw( make_path remove_tree );
2139 use POSIX qw(getcwd);
2140
2141 use constant TASK_TEMPFAIL => 111;
2142
2143 # Map SDK subdirectories to the path environments they belong to.
2144 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2145
2146 my $destdir = $ENV{"CRUNCH_SRC"};
2147 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2148 my $repo = $ENV{"CRUNCH_SRC_URL"};
2149 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2150 my $job_work = $ENV{"JOB_WORK"};
2151 my $task_work = $ENV{"TASK_WORK"};
2152
2153 open(STDOUT_ORIG, ">&", STDOUT);
2154 open(STDERR_ORIG, ">&", STDERR);
2155
2156 for my $dir ($destdir, $job_work, $task_work) {
2157   if ($dir) {
2158     make_path $dir;
2159     -e $dir or die "Failed to create temporary directory ($dir): $!";
2160   }
2161 }
2162
2163 if ($task_work) {
2164   remove_tree($task_work, {keep_root => 1});
2165 }
2166
2167 ### Crunch script run mode
2168 if (@ARGV) {
2169   # We want to do routine logging during task 0 only.  This gives the user
2170   # the information they need, but avoids repeating the information for every
2171   # task.
2172   my $Log;
2173   if ($ENV{TASK_SEQUENCE} eq "0") {
2174     $Log = sub {
2175       my $msg = shift;
2176       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2177     };
2178   } else {
2179     $Log = sub { };
2180   }
2181
2182   my $python_src = "$install_dir/python";
2183   my $venv_dir = "$job_work/.arvados.venv";
2184   my $venv_built = -e "$venv_dir/bin/activate";
2185   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2186     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2187                  "--python=python2.7", $venv_dir);
2188     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2189     $venv_built = 1;
2190     $Log->("Built Python SDK virtualenv");
2191   }
2192
2193   my @pysdk_version_cmd = ("python", "-c",
2194     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2195   if ($venv_built) {
2196     $Log->("Running in Python SDK virtualenv");
2197     @pysdk_version_cmd = ();
2198     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2199     @ARGV = ("/bin/sh", "-ec",
2200              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2201   } elsif (-d $python_src) {
2202     $Log->("Warning: virtualenv not found inside Docker container default " .
2203            "\$PATH. Can't install Python SDK.");
2204   }
2205
2206   if (@pysdk_version_cmd) {
2207     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2208     my $pysdk_version = <$pysdk_version_pipe>;
2209     close($pysdk_version_pipe);
2210     if ($? == 0) {
2211       chomp($pysdk_version);
2212       $Log->("Using Arvados SDK version $pysdk_version");
2213     } else {
2214       # A lot could've gone wrong here, but pretty much all of it means that
2215       # Python won't be able to load the Arvados SDK.
2216       $Log->("Warning: Arvados SDK not found");
2217     }
2218   }
2219
2220   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2221     my $sdk_path = "$install_dir/$sdk_dir";
2222     if (-d $sdk_path) {
2223       if ($ENV{$sdk_envkey}) {
2224         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2225       } else {
2226         $ENV{$sdk_envkey} = $sdk_path;
2227       }
2228       $Log->("Arvados SDK added to %s", $sdk_envkey);
2229     }
2230   }
2231
2232   exec(@ARGV);
2233   die "Cannot exec `@ARGV`: $!";
2234 }
2235
2236 ### Installation mode
2237 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2238 flock L, LOCK_EX;
2239 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2240   # This exact git archive (source + arvados sdk) is already installed
2241   # here, so there's no need to reinstall it.
2242
2243   # We must consume our DATA section, though: otherwise the process
2244   # feeding it to us will get SIGPIPE.
2245   my $buf;
2246   while (read(DATA, $buf, 65536)) { }
2247
2248   exit(0);
2249 }
2250
2251 unlink "$destdir.archive_hash";
2252 mkdir $destdir;
2253
2254 do {
2255   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2256   local $SIG{PIPE} = "IGNORE";
2257   warn "Extracting archive: $archive_hash\n";
2258   # --ignore-zeros is necessary sometimes: depending on how much NUL
2259   # padding tar -A put on our combined archive (which in turn depends
2260   # on the length of the component archives) tar without
2261   # --ignore-zeros will exit before consuming stdin and cause close()
2262   # to fail on the resulting SIGPIPE.
2263   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2264     die "Error launching 'tar -xC $destdir': $!";
2265   }
2266   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2267   # get SIGPIPE.  We must feed it data incrementally.
2268   my $tar_input;
2269   while (read(DATA, $tar_input, 65536)) {
2270     print TARX $tar_input;
2271   }
2272   if(!close(TARX)) {
2273     die "'tar -xC $destdir' exited $?: $!";
2274   }
2275 };
2276
2277 mkdir $install_dir;
2278
2279 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2280 if (-d $sdk_root) {
2281   foreach my $sdk_lang (("python",
2282                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2283     if (-d "$sdk_root/$sdk_lang") {
2284       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2285         die "Failed to install $sdk_lang SDK: $!";
2286       }
2287     }
2288   }
2289 }
2290
2291 my $python_dir = "$install_dir/python";
2292 if ((-d $python_dir) and can_run("python2.7")) {
2293   open(my $egg_info_pipe, "-|",
2294        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2295   my @egg_info_errors = <$egg_info_pipe>;
2296   close($egg_info_pipe);
2297
2298   if ($?) {
2299     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2300       # egg_info apparently failed because it couldn't ask git for a build tag.
2301       # Specify no build tag.
2302       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2303       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2304       close($pysdk_cfg);
2305     } else {
2306       my $egg_info_exit = $? >> 8;
2307       foreach my $errline (@egg_info_errors) {
2308         warn $errline;
2309       }
2310       warn "python setup.py egg_info failed: exit $egg_info_exit";
2311       exit ($egg_info_exit || 1);
2312     }
2313   }
2314 }
2315
2316 # Hide messages from the install script (unless it fails: shell_or_die
2317 # will show $destdir.log in that case).
2318 open(STDOUT, ">>", "$destdir.log");
2319 open(STDERR, ">&", STDOUT);
2320
2321 if (-e "$destdir/crunch_scripts/install") {
2322     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2323 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2324     # Old version
2325     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2326 } elsif (-e "./install.sh") {
2327     shell_or_die (undef, "./install.sh", $install_dir);
2328 }
2329
2330 if ($archive_hash) {
2331     unlink "$destdir.archive_hash.new";
2332     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2333     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2334 }
2335
2336 close L;
2337
2338 sub can_run {
2339   my $command_name = shift;
2340   open(my $which, "-|", "which", $command_name);
2341   while (<$which>) { }
2342   close($which);
2343   return ($? == 0);
2344 }
2345
2346 sub shell_or_die
2347 {
2348   my $exitcode = shift;
2349
2350   if ($ENV{"DEBUG"}) {
2351     print STDERR "@_\n";
2352   }
2353   if (system (@_) != 0) {
2354     my $err = $!;
2355     my $code = $?;
2356     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2357     open STDERR, ">&STDERR_ORIG";
2358     system ("cat $destdir.log >&2");
2359     warn "@_ failed ($err): $exitstatus";
2360     if (defined($exitcode)) {
2361       exit $exitcode;
2362     }
2363     else {
2364       exit (($code >> 8) || 1);
2365     }
2366   }
2367 }
2368
2369 __DATA__