7582: fix typo --user=$try_user to $try_user_arg
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $local_job = JSON::decode_json($jobspec);
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if (($Job || $local_job)->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $local_job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $local_job->{'started_at'} = gmtime;
216   $local_job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $local_job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  # The number of consecutive times a task has been dispatched
297                  # to this node and failed.
298                  losing_streak => 0,
299                  # The number of consecutive times that SLURM has reported
300                  # a node failure since the last successful task.
301                  fail_count => 0,
302                  # Don't dispatch work to this node until this time
303                  # (in seconds since the epoch) has passed.
304                  hold_until => 0 };
305     foreach my $cpu (1..$ncpus)
306     {
307       push @slot, { node => $node,
308                     cpu => $cpu };
309     }
310   }
311   push @node, @nodelist;
312 }
313
314
315
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
318
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
320
321
322 $Job->update_attributes(
323   'tasks_summary' => { 'failed' => 0,
324                        'todo' => 1,
325                        'running' => 0,
326                        'done' => 0 });
327
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
336
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
342
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
346
347
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
354
355
356
357 if (defined $Job->{thawedfromkey})
358 {
359   thaw ($Job->{thawedfromkey});
360 }
361 else
362 {
363   my $first_task = api_call("job_tasks/create", job_task => {
364     'job_uuid' => $Job->{'uuid'},
365     'sequence' => 0,
366     'qsequence' => 0,
367     'parameters' => {},
368   });
369   push @jobstep, { 'level' => 0,
370                    'failures' => 0,
371                    'arvados_task' => $first_task,
372                  };
373   push @jobstep_todo, 0;
374 }
375
376
377 if (!$have_slurm)
378 {
379   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
380 }
381
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
385
386 if (!defined $no_clear_tmp) {
387   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388   Log (undef, "Clean work dirs");
389
390   my $cleanpid = fork();
391   if ($cleanpid == 0)
392   {
393     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
394     # Then clean up work directories.
395     # TODO: When #5036 is done and widely deployed, we can limit mount's
396     # -t option to simply fuse.keep.
397     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
399     exit (1);
400   }
401   while (1)
402   {
403     last if $cleanpid == waitpid (-1, WNOHANG);
404     freeze_if_want_freeze ($cleanpid);
405     select (undef, undef, undef, 0.1);
406   }
407   if ($?) {
408     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
409     exit(EX_RETRY_UNLOCKED);
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   $docker_stream =~ s/^\.//;
422   my $docker_install_script = qq{
423 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
424     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
425 fi
426 };
427   my $docker_pid = fork();
428   if ($docker_pid == 0)
429   {
430     srun (["srun", "--nodelist=" . join(',', @node)],
431           ["/bin/sh", "-ec", $docker_install_script]);
432     exit ($?);
433   }
434   while (1)
435   {
436     last if $docker_pid == waitpid (-1, WNOHANG);
437     freeze_if_want_freeze ($docker_pid);
438     select (undef, undef, undef, 0.1);
439   }
440   if ($? != 0)
441   {
442     croak("Installing Docker image from $docker_locator exited "
443           .exit_status_s($?));
444   }
445
446   # Determine whether this version of Docker supports memory+swap limits.
447   srun(["srun", "--nodelist=" . $node[0]],
448        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
449       {fork => 1});
450   $docker_limitmem = ($? == 0);
451
452   # Find a non-root Docker user to use.
453   # Tries the default user for the container, then 'crunch', then 'nobody',
454   # testing for whether the actual user id is non-zero.  This defends against
455   # mistakes but not malice, but we intend to harden the security in the future
456   # so we don't want anyone getting used to their jobs running as root in their
457   # Docker containers.
458   my @tryusers = ("", "crunch", "nobody");
459   foreach my $try_user (@tryusers) {
460     my $try_user_arg;
461     if ($try_user eq "") {
462       Log(undef, "Checking if container default user is not UID 0");
463       $try_user_arg = "";
464     } else {
465       Log(undef, "Checking if user '$try_user' is not UID 0");
466       $try_user_arg = "--user=$try_user";
467     }
468     srun(["srun", "--nodelist=" . $node[0]],
469          ["/bin/sh", "-ec",
470           "a=`$docker_bin run --rm $try_user_arg $docker_hash id --user` && " .
471           " test \$a -ne 0"],
472          {fork => 1});
473     if ($? == 0) {
474       $dockeruserarg = $try_user_arg;
475       if ($try_user eq "") {
476         Log(undef, "Container will run with default user");
477       } else {
478         Log(undef, "Container will run with $dockeruserarg");
479       }
480       last;
481     }
482   }
483
484   if (!defined $dockeruserarg) {
485     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
486   }
487
488   if ($Job->{arvados_sdk_version}) {
489     # The job also specifies an Arvados SDK version.  Add the SDKs to the
490     # tar file for the build script to install.
491     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
492                        $Job->{arvados_sdk_version}));
493     add_git_archive("git", "--git-dir=$git_dir", "archive",
494                     "--prefix=.arvados.sdk/",
495                     $Job->{arvados_sdk_version}, "sdk");
496   }
497 }
498
499 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
500   # If script_version looks like an absolute path, *and* the --git-dir
501   # argument was not given -- which implies we were not invoked by
502   # crunch-dispatch -- we will use the given path as a working
503   # directory instead of resolving script_version to a git commit (or
504   # doing anything else with git).
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
506   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
507 }
508 else {
509   # Resolve the given script_version to a git commit sha1. Also, if
510   # the repository is remote, clone it into our local filesystem: this
511   # ensures "git archive" will work, and is necessary to reliably
512   # resolve a symbolic script_version like "master^".
513   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
514
515   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
516
517   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
518
519   # If we're running under crunch-dispatch, it will have already
520   # pulled the appropriate source tree into its own repository, and
521   # given us that repo's path as $git_dir.
522   #
523   # If we're running a "local" job, we might have to fetch content
524   # from a remote repository.
525   #
526   # (Currently crunch-dispatch gives a local path with --git-dir, but
527   # we might as well accept URLs there too in case it changes its
528   # mind.)
529   my $repo = $git_dir || $Job->{'repository'};
530
531   # Repository can be remote or local. If remote, we'll need to fetch it
532   # to a local dir before doing `git log` et al.
533   my $repo_location;
534
535   if ($repo =~ m{://|^[^/]*:}) {
536     # $repo is a git url we can clone, like git:// or https:// or
537     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
538     # not recognized here because distinguishing that from a local
539     # path is too fragile. If you really need something strange here,
540     # use the ssh:// form.
541     $repo_location = 'remote';
542   } elsif ($repo =~ m{^\.*/}) {
543     # $repo is a local path to a git index. We'll also resolve ../foo
544     # to ../foo/.git if the latter is a directory. To help
545     # disambiguate local paths from named hosted repositories, this
546     # form must be given as ./ or ../ if it's a relative path.
547     if (-d "$repo/.git") {
548       $repo = "$repo/.git";
549     }
550     $repo_location = 'local';
551   } else {
552     # $repo is none of the above. It must be the name of a hosted
553     # repository.
554     my $arv_repo_list = api_call("repositories/list",
555                                  'filters' => [['name','=',$repo]]);
556     my @repos_found = @{$arv_repo_list->{'items'}};
557     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
558     if ($n_found > 0) {
559       Log(undef, "Repository '$repo' -> "
560           . join(", ", map { $_->{'uuid'} } @repos_found));
561     }
562     if ($n_found != 1) {
563       croak("Error: Found $n_found repositories with name '$repo'.");
564     }
565     $repo = $repos_found[0]->{'fetch_url'};
566     $repo_location = 'remote';
567   }
568   Log(undef, "Using $repo_location repository '$repo'");
569   $ENV{"CRUNCH_SRC_URL"} = $repo;
570
571   # Resolve given script_version (we'll call that $treeish here) to a
572   # commit sha1 ($commit).
573   my $treeish = $Job->{'script_version'};
574   my $commit;
575   if ($repo_location eq 'remote') {
576     # We minimize excess object-fetching by re-using the same bare
577     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
578     # just keep adding remotes to it as needed.
579     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
580     my $gitcmd = "git --git-dir=\Q$local_repo\E";
581
582     # Set up our local repo for caching remote objects, making
583     # archives, etc.
584     if (!-d $local_repo) {
585       make_path($local_repo) or croak("Error: could not create $local_repo");
586     }
587     # This works (exits 0 and doesn't delete fetched objects) even
588     # if $local_repo is already initialized:
589     `$gitcmd init --bare`;
590     if ($?) {
591       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
592     }
593
594     # If $treeish looks like a hash (or abbrev hash) we look it up in
595     # our local cache first, since that's cheaper. (We don't want to
596     # do that with tags/branches though -- those change over time, so
597     # they should always be resolved by the remote repo.)
598     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
599       # Hide stderr because it's normal for this to fail:
600       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
601       if ($? == 0 &&
602           # Careful not to resolve a branch named abcdeff to commit 1234567:
603           $sha1 =~ /^$treeish/ &&
604           $sha1 =~ /^([0-9a-f]{40})$/s) {
605         $commit = $1;
606         Log(undef, "Commit $commit already present in $local_repo");
607       }
608     }
609
610     if (!defined $commit) {
611       # If $treeish isn't just a hash or abbrev hash, or isn't here
612       # yet, we need to fetch the remote to resolve it correctly.
613
614       # First, remove all local heads. This prevents a name that does
615       # not exist on the remote from resolving to (or colliding with)
616       # a previously fetched branch or tag (possibly from a different
617       # remote).
618       remove_tree("$local_repo/refs/heads", {keep_root => 1});
619
620       Log(undef, "Fetching objects from $repo to $local_repo");
621       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
622       if ($?) {
623         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
624       }
625     }
626
627     # Now that the data is all here, we will use our local repo for
628     # the rest of our git activities.
629     $repo = $local_repo;
630   }
631
632   my $gitcmd = "git --git-dir=\Q$repo\E";
633   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
634   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
635     croak("`$gitcmd rev-list` exited "
636           .exit_status_s($?)
637           .", '$treeish' not found, giving up");
638   }
639   $commit = $1;
640   Log(undef, "Version $treeish is commit $commit");
641
642   if ($commit ne $Job->{'script_version'}) {
643     # Record the real commit id in the database, frozentokey, logs,
644     # etc. -- instead of an abbreviation or a branch name which can
645     # become ambiguous or point to a different commit in the future.
646     if (!$Job->update_attributes('script_version' => $commit)) {
647       croak("Error: failed to update job's script_version attribute");
648     }
649   }
650
651   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
652   add_git_archive("$gitcmd archive ''\Q$commit\E");
653 }
654
655 my $git_archive = combined_git_archive();
656 if (!defined $git_archive) {
657   Log(undef, "Skip install phase (no git archive)");
658   if ($have_slurm) {
659     Log(undef, "Warning: This probably means workers have no source tree!");
660   }
661 }
662 else {
663   my $install_exited;
664   my $install_script_tries_left = 3;
665   for (my $attempts = 0; $attempts < 3; $attempts++) {
666     Log(undef, "Run install script on all workers");
667
668     my @srunargs = ("srun",
669                     "--nodelist=$nodelist",
670                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
671     my @execargs = ("sh", "-c",
672                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
673
674     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
675     my ($install_stderr_r, $install_stderr_w);
676     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
677     set_nonblocking($install_stderr_r);
678     my $installpid = fork();
679     if ($installpid == 0)
680     {
681       close($install_stderr_r);
682       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
683       open(STDOUT, ">&", $install_stderr_w);
684       open(STDERR, ">&", $install_stderr_w);
685       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
686       exit (1);
687     }
688     close($install_stderr_w);
689     # Tell freeze_if_want_freeze how to kill the child, otherwise the
690     # "waitpid(installpid)" loop won't get interrupted by a freeze:
691     $proc{$installpid} = {};
692     my $stderr_buf = '';
693     # Track whether anything appears on stderr other than slurm errors
694     # ("srun: ...") and the "starting: ..." message printed by the
695     # srun subroutine itself:
696     my $stderr_anything_from_script = 0;
697     my $match_our_own_errors = '^(srun: error: |starting: \[)';
698     while ($installpid != waitpid(-1, WNOHANG)) {
699       freeze_if_want_freeze ($installpid);
700       # Wait up to 0.1 seconds for something to appear on stderr, then
701       # do a non-blocking read.
702       my $bits = fhbits($install_stderr_r);
703       select ($bits, undef, $bits, 0.1);
704       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
705       {
706         while ($stderr_buf =~ /^(.*?)\n/) {
707           my $line = $1;
708           substr $stderr_buf, 0, 1+length($line), "";
709           Log(undef, "stderr $line");
710           if ($line !~ /$match_our_own_errors/) {
711             $stderr_anything_from_script = 1;
712           }
713         }
714       }
715     }
716     delete $proc{$installpid};
717     $install_exited = $?;
718     close($install_stderr_r);
719     if (length($stderr_buf) > 0) {
720       if ($stderr_buf !~ /$match_our_own_errors/) {
721         $stderr_anything_from_script = 1;
722       }
723       Log(undef, "stderr $stderr_buf")
724     }
725
726     Log (undef, "Install script exited ".exit_status_s($install_exited));
727     last if $install_exited == 0 || $main::please_freeze;
728     # If the install script fails but doesn't print an error message,
729     # the next thing anyone is likely to do is just run it again in
730     # case it was a transient problem like "slurm communication fails
731     # because the network isn't reliable enough". So we'll just do
732     # that ourselves (up to 3 attempts in total). OTOH, if there is an
733     # error message, the problem is more likely to have a real fix and
734     # we should fail the job so the fixing process can start, instead
735     # of doing 2 more attempts.
736     last if $stderr_anything_from_script;
737   }
738
739   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
740     unlink($tar_filename);
741   }
742
743   if ($install_exited != 0) {
744     croak("Giving up");
745   }
746 }
747
748 foreach (qw (script script_version script_parameters runtime_constraints))
749 {
750   Log (undef,
751        "$_ " .
752        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
753 }
754 foreach (split (/\n/, $Job->{knobs}))
755 {
756   Log (undef, "knob " . $_);
757 }
758
759
760
761 $main::success = undef;
762
763
764
765 ONELEVEL:
766
767 my $thisround_succeeded = 0;
768 my $thisround_failed = 0;
769 my $thisround_failed_multiple = 0;
770 my $working_slot_count = scalar(@slot);
771
772 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
773                        or $a <=> $b } @jobstep_todo;
774 my $level = $jobstep[$jobstep_todo[0]]->{level};
775
776 my $initial_tasks_this_level = 0;
777 foreach my $id (@jobstep_todo) {
778   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
779 }
780
781 # If the number of tasks scheduled at this level #T is smaller than the number
782 # of slots available #S, only use the first #T slots, or the first slot on
783 # each node, whichever number is greater.
784 #
785 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
786 # based on these numbers.  Using fewer slots makes more resources available
787 # to each individual task, which should normally be a better strategy when
788 # there are fewer of them running with less parallelism.
789 #
790 # Note that this calculation is not redone if the initial tasks at
791 # this level queue more tasks at the same level.  This may harm
792 # overall task throughput for that level.
793 my @freeslot;
794 if ($initial_tasks_this_level < @node) {
795   @freeslot = (0..$#node);
796 } elsif ($initial_tasks_this_level < @slot) {
797   @freeslot = (0..$initial_tasks_this_level - 1);
798 } else {
799   @freeslot = (0..$#slot);
800 }
801 my $round_num_freeslots = scalar(@freeslot);
802
803 my %round_max_slots = ();
804 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
805   my $this_slot = $slot[$freeslot[$ii]];
806   my $node_name = $this_slot->{node}->{name};
807   $round_max_slots{$node_name} ||= $this_slot->{cpu};
808   last if (scalar(keys(%round_max_slots)) >= @node);
809 }
810
811 Log(undef, "start level $level with $round_num_freeslots slots");
812 my @holdslot;
813 my %reader;
814 my $progress_is_dirty = 1;
815 my $progress_stats_updated = 0;
816
817 update_progress_stats();
818
819
820 THISROUND:
821 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
822 {
823   my $id = $jobstep_todo[$todo_ptr];
824   my $Jobstep = $jobstep[$id];
825   if ($Jobstep->{level} != $level)
826   {
827     next;
828   }
829
830   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
831   set_nonblocking($reader{$id});
832
833   my $childslot = $freeslot[0];
834   my $childnode = $slot[$childslot]->{node};
835   my $childslotname = join (".",
836                             $slot[$childslot]->{node}->{name},
837                             $slot[$childslot]->{cpu});
838
839   my $childpid = fork();
840   if ($childpid == 0)
841   {
842     $SIG{'INT'} = 'DEFAULT';
843     $SIG{'QUIT'} = 'DEFAULT';
844     $SIG{'TERM'} = 'DEFAULT';
845
846     foreach (values (%reader))
847     {
848       close($_);
849     }
850     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
851     open(STDOUT,">&writer");
852     open(STDERR,">&writer");
853
854     undef $dbh;
855     undef $sth;
856
857     delete $ENV{"GNUPGHOME"};
858     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
859     $ENV{"TASK_QSEQUENCE"} = $id;
860     $ENV{"TASK_SEQUENCE"} = $level;
861     $ENV{"JOB_SCRIPT"} = $Job->{script};
862     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
863       $param =~ tr/a-z/A-Z/;
864       $ENV{"JOB_PARAMETER_$param"} = $value;
865     }
866     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
867     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
868     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
869     $ENV{"HOME"} = $ENV{"TASK_WORK"};
870     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
871     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
872     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
873     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
874
875     $ENV{"GZIP"} = "-n";
876
877     my @srunargs = (
878       "srun",
879       "--nodelist=".$childnode->{name},
880       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
881       "--job-name=$job_id.$id.$$",
882         );
883     my $command =
884         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
885         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
886         ."&& cd $ENV{CRUNCH_TMP} "
887         # These environment variables get used explicitly later in
888         # $command.  No tool is expected to read these values directly.
889         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
890         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
891         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
892         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
893     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
894     if ($docker_hash)
895     {
896       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
897       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
898       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
899       # We only set memory limits if Docker lets us limit both memory and swap.
900       # Memory limits alone have been supported longer, but subprocesses tend
901       # to get SIGKILL if they exceed that without any swap limit set.
902       # See #5642 for additional background.
903       if ($docker_limitmem) {
904         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
905       }
906
907       # The source tree and $destdir directory (which we have
908       # installed on the worker host) are available in the container,
909       # under the same path.
910       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
911       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
912
913       # Currently, we make arv-mount's mount point appear at /keep
914       # inside the container (instead of using the same path as the
915       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
916       # crunch scripts and utilities must not rely on this. They must
917       # use $TASK_KEEPMOUNT.
918       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
919       $ENV{TASK_KEEPMOUNT} = "/keep";
920
921       # TASK_WORK is almost exactly like a docker data volume: it
922       # starts out empty, is writable, and persists until no
923       # containers use it any more. We don't use --volumes-from to
924       # share it with other containers: it is only accessible to this
925       # task, and it goes away when this task stops.
926       #
927       # However, a docker data volume is writable only by root unless
928       # the mount point already happens to exist in the container with
929       # different permissions. Therefore, we [1] assume /tmp already
930       # exists in the image and is writable by the crunch user; [2]
931       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
932       # writable if they are created by docker while setting up the
933       # other --volumes); and [3] create $TASK_WORK inside the
934       # container using $build_script.
935       $command .= "--volume=/tmp ";
936       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
937       $ENV{"HOME"} = $ENV{"TASK_WORK"};
938       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
939
940       # TODO: Share a single JOB_WORK volume across all task
941       # containers on a given worker node, and delete it when the job
942       # ends (and, in case that doesn't work, when the next job
943       # starts).
944       #
945       # For now, use the same approach as TASK_WORK above.
946       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
947
948       while (my ($env_key, $env_val) = each %ENV)
949       {
950         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
951           $command .= "--env=\Q$env_key=$env_val\E ";
952         }
953       }
954       $command .= "--env=\QHOME=$ENV{HOME}\E ";
955       $command .= "\Q$docker_hash\E ";
956
957       if ($Job->{arvados_sdk_version}) {
958         $command .= "stdbuf --output=0 --error=0 ";
959         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
960       } else {
961         $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E\'";
962       }
963     } else {
964       # Non-docker run
965       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
966       $command .= "stdbuf --output=0 --error=0 ";
967       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
968     }
969
970     my @execargs = ('bash', '-c', $command);
971     srun (\@srunargs, \@execargs, undef, $build_script);
972     # exec() failed, we assume nothing happened.
973     die "srun() failed on build script\n";
974   }
975   close("writer");
976   if (!defined $childpid)
977   {
978     close $reader{$id};
979     delete $reader{$id};
980     next;
981   }
982   shift @freeslot;
983   $proc{$childpid} = { jobstep => $id,
984                        time => time,
985                        slot => $childslot,
986                        jobstepname => "$job_id.$id.$childpid",
987                      };
988   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
989   $slot[$childslot]->{pid} = $childpid;
990
991   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
992   Log ($id, "child $childpid started on $childslotname");
993   $Jobstep->{starttime} = time;
994   $Jobstep->{node} = $childnode->{name};
995   $Jobstep->{slotindex} = $childslot;
996   delete $Jobstep->{stderr};
997   delete $Jobstep->{finishtime};
998   delete $Jobstep->{tempfail};
999
1000   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1001   $Jobstep->{'arvados_task'}->save;
1002
1003   splice @jobstep_todo, $todo_ptr, 1;
1004   --$todo_ptr;
1005
1006   $progress_is_dirty = 1;
1007
1008   while (!@freeslot
1009          ||
1010          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1011   {
1012     last THISROUND if $main::please_freeze || defined($main::success);
1013     if ($main::please_info)
1014     {
1015       $main::please_info = 0;
1016       freeze();
1017       create_output_collection();
1018       save_meta(1);
1019       update_progress_stats();
1020     }
1021     my $gotsome
1022         = readfrompipes ()
1023         + reapchildren ();
1024     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1025     {
1026       check_refresh_wanted();
1027       check_squeue();
1028       update_progress_stats();
1029       select (undef, undef, undef, 0.1);
1030     }
1031     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1032     {
1033       update_progress_stats();
1034     }
1035     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1036                                         $_->{node}->{hold_count} < 4 } @slot);
1037     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1038         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1039     {
1040       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1041           .($thisround_failed+$thisround_succeeded)
1042           .") -- giving up on this round";
1043       Log (undef, $message);
1044       last THISROUND;
1045     }
1046
1047     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1048     for (my $i=$#freeslot; $i>=0; $i--) {
1049       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1050         push @holdslot, (splice @freeslot, $i, 1);
1051       }
1052     }
1053     for (my $i=$#holdslot; $i>=0; $i--) {
1054       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1055         push @freeslot, (splice @holdslot, $i, 1);
1056       }
1057     }
1058
1059     # give up if no nodes are succeeding
1060     if ($working_slot_count < 1) {
1061       Log(undef, "Every node has failed -- giving up");
1062       last THISROUND;
1063     }
1064   }
1065 }
1066
1067
1068 push @freeslot, splice @holdslot;
1069 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1070
1071
1072 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1073 while (%proc)
1074 {
1075   if ($main::please_continue) {
1076     $main::please_continue = 0;
1077     goto THISROUND;
1078   }
1079   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1080   readfrompipes ();
1081   if (!reapchildren())
1082   {
1083     check_refresh_wanted();
1084     check_squeue();
1085     update_progress_stats();
1086     select (undef, undef, undef, 0.1);
1087     killem (keys %proc) if $main::please_freeze;
1088   }
1089 }
1090
1091 update_progress_stats();
1092 freeze_if_want_freeze();
1093
1094
1095 if (!defined $main::success)
1096 {
1097   if (!@jobstep_todo) {
1098     $main::success = 1;
1099   } elsif ($working_slot_count < 1) {
1100     save_output_collection();
1101     save_meta();
1102     exit(EX_RETRY_UNLOCKED);
1103   } elsif ($thisround_succeeded == 0 &&
1104            ($thisround_failed == 0 || $thisround_failed > 4)) {
1105     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1106     Log (undef, $message);
1107     $main::success = 0;
1108   }
1109 }
1110
1111 goto ONELEVEL if !defined $main::success;
1112
1113
1114 release_allocation();
1115 freeze();
1116 my $collated_output = save_output_collection();
1117 Log (undef, "finish");
1118
1119 save_meta();
1120
1121 my $final_state;
1122 if ($collated_output && $main::success) {
1123   $final_state = 'Complete';
1124 } else {
1125   $final_state = 'Failed';
1126 }
1127 $Job->update_attributes('state' => $final_state);
1128
1129 exit (($final_state eq 'Complete') ? 0 : 1);
1130
1131
1132
1133 sub update_progress_stats
1134 {
1135   $progress_stats_updated = time;
1136   return if !$progress_is_dirty;
1137   my ($todo, $done, $running) = (scalar @jobstep_todo,
1138                                  scalar @jobstep_done,
1139                                  scalar keys(%proc));
1140   $Job->{'tasks_summary'} ||= {};
1141   $Job->{'tasks_summary'}->{'todo'} = $todo;
1142   $Job->{'tasks_summary'}->{'done'} = $done;
1143   $Job->{'tasks_summary'}->{'running'} = $running;
1144   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1145   Log (undef, "status: $done done, $running running, $todo todo");
1146   $progress_is_dirty = 0;
1147 }
1148
1149
1150
1151 sub reapchildren
1152 {
1153   my $pid = waitpid (-1, WNOHANG);
1154   return 0 if $pid <= 0;
1155
1156   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1157                   . "."
1158                   . $slot[$proc{$pid}->{slot}]->{cpu});
1159   my $jobstepid = $proc{$pid}->{jobstep};
1160   my $elapsed = time - $proc{$pid}->{time};
1161   my $Jobstep = $jobstep[$jobstepid];
1162
1163   my $childstatus = $?;
1164   my $exitvalue = $childstatus >> 8;
1165   my $exitinfo = "exit ".exit_status_s($childstatus);
1166   $Jobstep->{'arvados_task'}->reload;
1167   my $task_success = $Jobstep->{'arvados_task'}->{success};
1168
1169   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1170
1171   if (!defined $task_success) {
1172     # task did not indicate one way or the other --> fail
1173     $Jobstep->{'arvados_task'}->{success} = 0;
1174     $Jobstep->{'arvados_task'}->save;
1175     $task_success = 0;
1176   }
1177
1178   if (!$task_success)
1179   {
1180     my $temporary_fail;
1181     $temporary_fail ||= $Jobstep->{tempfail};
1182     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1183
1184     ++$thisround_failed;
1185     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1186
1187     # Check for signs of a failed or misconfigured node
1188     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1189         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1190       # Don't count this against jobstep failure thresholds if this
1191       # node is already suspected faulty and srun exited quickly
1192       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1193           $elapsed < 5) {
1194         Log ($jobstepid, "blaming failure on suspect node " .
1195              $slot[$proc{$pid}->{slot}]->{node}->{name});
1196         $temporary_fail ||= 1;
1197       }
1198       ban_node_by_slot($proc{$pid}->{slot});
1199     }
1200
1201     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1202                              ++$Jobstep->{'failures'},
1203                              $temporary_fail ? 'temporary' : 'permanent',
1204                              $elapsed));
1205
1206     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1207       # Give up on this task, and the whole job
1208       $main::success = 0;
1209     }
1210     # Put this task back on the todo queue
1211     push @jobstep_todo, $jobstepid;
1212     $Job->{'tasks_summary'}->{'failed'}++;
1213   }
1214   else
1215   {
1216     ++$thisround_succeeded;
1217     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1218     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1219     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1220     push @jobstep_done, $jobstepid;
1221     Log ($jobstepid, "success in $elapsed seconds");
1222   }
1223   $Jobstep->{exitcode} = $childstatus;
1224   $Jobstep->{finishtime} = time;
1225   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1226   $Jobstep->{'arvados_task'}->save;
1227   process_stderr ($jobstepid, $task_success);
1228   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1229                            length($Jobstep->{'arvados_task'}->{output}),
1230                            $Jobstep->{'arvados_task'}->{output}));
1231
1232   close $reader{$jobstepid};
1233   delete $reader{$jobstepid};
1234   delete $slot[$proc{$pid}->{slot}]->{pid};
1235   push @freeslot, $proc{$pid}->{slot};
1236   delete $proc{$pid};
1237
1238   if ($task_success) {
1239     # Load new tasks
1240     my $newtask_list = [];
1241     my $newtask_results;
1242     do {
1243       $newtask_results = api_call(
1244         "job_tasks/list",
1245         'where' => {
1246           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1247         },
1248         'order' => 'qsequence',
1249         'offset' => scalar(@$newtask_list),
1250       );
1251       push(@$newtask_list, @{$newtask_results->{items}});
1252     } while (@{$newtask_results->{items}});
1253     foreach my $arvados_task (@$newtask_list) {
1254       my $jobstep = {
1255         'level' => $arvados_task->{'sequence'},
1256         'failures' => 0,
1257         'arvados_task' => $arvados_task
1258       };
1259       push @jobstep, $jobstep;
1260       push @jobstep_todo, $#jobstep;
1261     }
1262   }
1263
1264   $progress_is_dirty = 1;
1265   1;
1266 }
1267
1268 sub check_refresh_wanted
1269 {
1270   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1271   if (@stat && $stat[9] > $latest_refresh) {
1272     $latest_refresh = scalar time;
1273     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1274     for my $attr ('cancelled_at',
1275                   'cancelled_by_user_uuid',
1276                   'cancelled_by_client_uuid',
1277                   'state') {
1278       $Job->{$attr} = $Job2->{$attr};
1279     }
1280     if ($Job->{'state'} ne "Running") {
1281       if ($Job->{'state'} eq "Cancelled") {
1282         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1283       } else {
1284         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1285       }
1286       $main::success = 0;
1287       $main::please_freeze = 1;
1288     }
1289   }
1290 }
1291
1292 sub check_squeue
1293 {
1294   my $last_squeue_check = $squeue_checked;
1295
1296   # Do not call `squeue` or check the kill list more than once every
1297   # 15 seconds.
1298   return if $last_squeue_check > time - 15;
1299   $squeue_checked = time;
1300
1301   # Look for children from which we haven't received stderr data since
1302   # the last squeue check. If no such children exist, all procs are
1303   # alive and there's no need to even look at squeue.
1304   #
1305   # As long as the crunchstat poll interval (10s) is shorter than the
1306   # squeue check interval (15s) this should make the squeue check an
1307   # infrequent event.
1308   my $silent_procs = 0;
1309   for my $jobstep (values %proc)
1310   {
1311     if ($jobstep->{stderr_at} < $last_squeue_check)
1312     {
1313       $silent_procs++;
1314     }
1315   }
1316   return if $silent_procs == 0;
1317
1318   # use killem() on procs whose killtime is reached
1319   while (my ($pid, $jobstep) = each %proc)
1320   {
1321     if (exists $jobstep->{killtime}
1322         && $jobstep->{killtime} <= time
1323         && $jobstep->{stderr_at} < $last_squeue_check)
1324     {
1325       my $sincewhen = "";
1326       if ($jobstep->{stderr_at}) {
1327         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1328       }
1329       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1330       killem ($pid);
1331     }
1332   }
1333
1334   if (!$have_slurm)
1335   {
1336     # here is an opportunity to check for mysterious problems with local procs
1337     return;
1338   }
1339
1340   # Get a list of steps still running.  Note: squeue(1) says --steps
1341   # selects a format (which we override anyway) and allows us to
1342   # specify which steps we're interested in (which we don't).
1343   # Importantly, it also changes the meaning of %j from "job name" to
1344   # "step name" and (although this isn't mentioned explicitly in the
1345   # docs) switches from "one line per job" mode to "one line per step"
1346   # mode. Without it, we'd just get a list of one job, instead of a
1347   # list of N steps.
1348   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1349   if ($? != 0)
1350   {
1351     Log(undef, "warning: squeue exit status $? ($!)");
1352     return;
1353   }
1354   chop @squeue;
1355
1356   # which of my jobsteps are running, according to squeue?
1357   my %ok;
1358   for my $jobstepname (@squeue)
1359   {
1360     $ok{$jobstepname} = 1;
1361   }
1362
1363   # Check for child procs >60s old and not mentioned by squeue.
1364   while (my ($pid, $jobstep) = each %proc)
1365   {
1366     if ($jobstep->{time} < time - 60
1367         && $jobstep->{jobstepname}
1368         && !exists $ok{$jobstep->{jobstepname}}
1369         && !exists $jobstep->{killtime})
1370     {
1371       # According to slurm, this task has ended (successfully or not)
1372       # -- but our srun child hasn't exited. First we must wait (30
1373       # seconds) in case this is just a race between communication
1374       # channels. Then, if our srun child process still hasn't
1375       # terminated, we'll conclude some slurm communication
1376       # error/delay has caused the task to die without notifying srun,
1377       # and we'll kill srun ourselves.
1378       $jobstep->{killtime} = time + 30;
1379       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1380     }
1381   }
1382 }
1383
1384
1385 sub release_allocation
1386 {
1387   if ($have_slurm)
1388   {
1389     Log (undef, "release job allocation");
1390     system "scancel $ENV{SLURM_JOB_ID}";
1391   }
1392 }
1393
1394
1395 sub readfrompipes
1396 {
1397   my $gotsome = 0;
1398   foreach my $job (keys %reader)
1399   {
1400     my $buf;
1401     while (0 < sysread ($reader{$job}, $buf, 8192))
1402     {
1403       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1404       $jobstep[$job]->{stderr_at} = time;
1405       $jobstep[$job]->{stderr} .= $buf;
1406       preprocess_stderr ($job);
1407       if (length ($jobstep[$job]->{stderr}) > 16384)
1408       {
1409         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1410       }
1411       $gotsome = 1;
1412     }
1413   }
1414   return $gotsome;
1415 }
1416
1417
1418 sub preprocess_stderr
1419 {
1420   my $job = shift;
1421
1422   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1423     my $line = $1;
1424     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1425     Log ($job, "stderr $line");
1426     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1427       # whoa.
1428       $main::please_freeze = 1;
1429     }
1430     elsif ($line =~ /srun: error: Node failure on/) {
1431       my $job_slot_index = $jobstep[$job]->{slotindex};
1432       $slot[$job_slot_index]->{node}->{fail_count}++;
1433       $jobstep[$job]->{tempfail} = 1;
1434       ban_node_by_slot($job_slot_index);
1435     }
1436     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1437       $jobstep[$job]->{tempfail} = 1;
1438       ban_node_by_slot($jobstep[$job]->{slotindex});
1439     }
1440     elsif ($line =~ /arvados\.errors\.Keep/) {
1441       $jobstep[$job]->{tempfail} = 1;
1442     }
1443   }
1444 }
1445
1446
1447 sub process_stderr
1448 {
1449   my $job = shift;
1450   my $task_success = shift;
1451   preprocess_stderr ($job);
1452
1453   map {
1454     Log ($job, "stderr $_");
1455   } split ("\n", $jobstep[$job]->{stderr});
1456 }
1457
1458 sub fetch_block
1459 {
1460   my $hash = shift;
1461   my $keep;
1462   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1463     Log(undef, "fetch_block run error from arv-get $hash: $!");
1464     return undef;
1465   }
1466   my $output_block = "";
1467   while (1) {
1468     my $buf;
1469     my $bytes = sysread($keep, $buf, 1024 * 1024);
1470     if (!defined $bytes) {
1471       Log(undef, "fetch_block read error from arv-get: $!");
1472       $output_block = undef;
1473       last;
1474     } elsif ($bytes == 0) {
1475       # sysread returns 0 at the end of the pipe.
1476       last;
1477     } else {
1478       # some bytes were read into buf.
1479       $output_block .= $buf;
1480     }
1481   }
1482   close $keep;
1483   if ($?) {
1484     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1485     $output_block = undef;
1486   }
1487   return $output_block;
1488 }
1489
1490 # Create a collection by concatenating the output of all tasks (each
1491 # task's output is either a manifest fragment, a locator for a
1492 # manifest fragment stored in Keep, or nothing at all). Return the
1493 # portable_data_hash of the new collection.
1494 sub create_output_collection
1495 {
1496   Log (undef, "collate");
1497
1498   my ($child_out, $child_in);
1499   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1500 import arvados
1501 import sys
1502 print (arvados.api("v1").collections().
1503        create(body={"manifest_text": sys.stdin.read()}).
1504        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1505 }, retry_count());
1506
1507   my $task_idx = -1;
1508   my $manifest_size = 0;
1509   for (@jobstep)
1510   {
1511     ++$task_idx;
1512     my $output = $_->{'arvados_task'}->{output};
1513     next if (!defined($output));
1514     my $next_write;
1515     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1516       $next_write = fetch_block($output);
1517     } else {
1518       $next_write = $output;
1519     }
1520     if (defined($next_write)) {
1521       if (!defined(syswrite($child_in, $next_write))) {
1522         # There's been an error writing.  Stop the loop.
1523         # We'll log details about the exit code later.
1524         last;
1525       } else {
1526         $manifest_size += length($next_write);
1527       }
1528     } else {
1529       my $uuid = $_->{'arvados_task'}->{'uuid'};
1530       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1531       $main::success = 0;
1532     }
1533   }
1534   close($child_in);
1535   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1536
1537   my $joboutput;
1538   my $s = IO::Select->new($child_out);
1539   if ($s->can_read(120)) {
1540     sysread($child_out, $joboutput, 1024 * 1024);
1541     waitpid($pid, 0);
1542     if ($?) {
1543       Log(undef, "output collection creation exited " . exit_status_s($?));
1544       $joboutput = undef;
1545     } else {
1546       chomp($joboutput);
1547     }
1548   } else {
1549     Log (undef, "timed out while creating output collection");
1550     foreach my $signal (2, 2, 2, 15, 15, 9) {
1551       kill($signal, $pid);
1552       last if waitpid($pid, WNOHANG) == -1;
1553       sleep(1);
1554     }
1555   }
1556   close($child_out);
1557
1558   return $joboutput;
1559 }
1560
1561 # Calls create_output_collection, logs the result, and returns it.
1562 # If that was successful, save that as the output in the job record.
1563 sub save_output_collection {
1564   my $collated_output = create_output_collection();
1565
1566   if (!$collated_output) {
1567     Log(undef, "Failed to write output collection");
1568   }
1569   else {
1570     Log(undef, "job output $collated_output");
1571     $Job->update_attributes('output' => $collated_output);
1572   }
1573   return $collated_output;
1574 }
1575
1576 sub killem
1577 {
1578   foreach (@_)
1579   {
1580     my $sig = 2;                # SIGINT first
1581     if (exists $proc{$_}->{"sent_$sig"} &&
1582         time - $proc{$_}->{"sent_$sig"} > 4)
1583     {
1584       $sig = 15;                # SIGTERM if SIGINT doesn't work
1585     }
1586     if (exists $proc{$_}->{"sent_$sig"} &&
1587         time - $proc{$_}->{"sent_$sig"} > 4)
1588     {
1589       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1590     }
1591     if (!exists $proc{$_}->{"sent_$sig"})
1592     {
1593       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1594       kill $sig, $_;
1595       select (undef, undef, undef, 0.1);
1596       if ($sig == 2)
1597       {
1598         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1599       }
1600       $proc{$_}->{"sent_$sig"} = time;
1601       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1602     }
1603   }
1604 }
1605
1606
1607 sub fhbits
1608 {
1609   my($bits);
1610   for (@_) {
1611     vec($bits,fileno($_),1) = 1;
1612   }
1613   $bits;
1614 }
1615
1616
1617 # Send log output to Keep via arv-put.
1618 #
1619 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1620 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1621 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1622 # $log_pipe_pid is the pid of the arv-put subprocess.
1623 #
1624 # The only functions that should access these variables directly are:
1625 #
1626 # log_writer_start($logfilename)
1627 #     Starts an arv-put pipe, reading data on stdin and writing it to
1628 #     a $logfilename file in an output collection.
1629 #
1630 # log_writer_read_output([$timeout])
1631 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1632 #     Passes $timeout to the select() call, with a default of 0.01.
1633 #     Returns the result of the last read() call on $log_pipe_out, or
1634 #     -1 if read() wasn't called because select() timed out.
1635 #     Only other log_writer_* functions should need to call this.
1636 #
1637 # log_writer_send($txt)
1638 #     Writes $txt to the output log collection.
1639 #
1640 # log_writer_finish()
1641 #     Closes the arv-put pipe and returns the output that it produces.
1642 #
1643 # log_writer_is_active()
1644 #     Returns a true value if there is currently a live arv-put
1645 #     process, false otherwise.
1646 #
1647 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1648     $log_pipe_pid);
1649
1650 sub log_writer_start($)
1651 {
1652   my $logfilename = shift;
1653   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1654                         'arv-put',
1655                         '--stream',
1656                         '--retries', '3',
1657                         '--filename', $logfilename,
1658                         '-');
1659   $log_pipe_out_buf = "";
1660   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1661 }
1662
1663 sub log_writer_read_output {
1664   my $timeout = shift || 0.01;
1665   my $read = -1;
1666   while ($read && $log_pipe_out_select->can_read($timeout)) {
1667     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1668                  length($log_pipe_out_buf));
1669   }
1670   if (!defined($read)) {
1671     Log(undef, "error reading log manifest from arv-put: $!");
1672   }
1673   return $read;
1674 }
1675
1676 sub log_writer_send($)
1677 {
1678   my $txt = shift;
1679   print $log_pipe_in $txt;
1680   log_writer_read_output();
1681 }
1682
1683 sub log_writer_finish()
1684 {
1685   return unless $log_pipe_pid;
1686
1687   close($log_pipe_in);
1688
1689   my $read_result = log_writer_read_output(120);
1690   if ($read_result == -1) {
1691     Log (undef, "timed out reading from 'arv-put'");
1692   } elsif ($read_result != 0) {
1693     Log(undef, "failed to read arv-put log manifest to EOF");
1694   }
1695
1696   waitpid($log_pipe_pid, 0);
1697   if ($?) {
1698     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1699   }
1700
1701   close($log_pipe_out);
1702   my $arv_put_output = $log_pipe_out_buf;
1703   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1704       $log_pipe_out_select = undef;
1705
1706   return $arv_put_output;
1707 }
1708
1709 sub log_writer_is_active() {
1710   return $log_pipe_pid;
1711 }
1712
1713 sub Log                         # ($jobstep_id, $logmessage)
1714 {
1715   if ($_[1] =~ /\n/) {
1716     for my $line (split (/\n/, $_[1])) {
1717       Log ($_[0], $line);
1718     }
1719     return;
1720   }
1721   my $fh = select STDERR; $|=1; select $fh;
1722   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1723   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1724   $message .= "\n";
1725   my $datetime;
1726   if (log_writer_is_active() || -t STDERR) {
1727     my @gmtime = gmtime;
1728     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1729                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1730   }
1731   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1732
1733   if (log_writer_is_active()) {
1734     log_writer_send($datetime . " " . $message);
1735   }
1736 }
1737
1738
1739 sub croak
1740 {
1741   my ($package, $file, $line) = caller;
1742   my $message = "@_ at $file line $line\n";
1743   Log (undef, $message);
1744   freeze() if @jobstep_todo;
1745   create_output_collection() if @jobstep_todo;
1746   cleanup();
1747   save_meta();
1748   die;
1749 }
1750
1751
1752 sub cleanup
1753 {
1754   return unless $Job;
1755   if ($Job->{'state'} eq 'Cancelled') {
1756     $Job->update_attributes('finished_at' => scalar gmtime);
1757   } else {
1758     $Job->update_attributes('state' => 'Failed');
1759   }
1760 }
1761
1762
1763 sub save_meta
1764 {
1765   my $justcheckpoint = shift; # false if this will be the last meta saved
1766   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1767   return unless log_writer_is_active();
1768
1769   my $log_manifest = "";
1770   if ($Job->{log}) {
1771     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1772     $log_manifest .= $prev_log_coll->{manifest_text};
1773   }
1774   $log_manifest .= log_writer_finish();
1775
1776   my $log_coll = api_call(
1777     "collections/create", ensure_unique_name => 1, collection => {
1778       manifest_text => $log_manifest,
1779       owner_uuid => $Job->{owner_uuid},
1780       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1781     });
1782   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1783   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1784 }
1785
1786
1787 sub freeze_if_want_freeze
1788 {
1789   if ($main::please_freeze)
1790   {
1791     release_allocation();
1792     if (@_)
1793     {
1794       # kill some srun procs before freeze+stop
1795       map { $proc{$_} = {} } @_;
1796       while (%proc)
1797       {
1798         killem (keys %proc);
1799         select (undef, undef, undef, 0.1);
1800         my $died;
1801         while (($died = waitpid (-1, WNOHANG)) > 0)
1802         {
1803           delete $proc{$died};
1804         }
1805       }
1806     }
1807     freeze();
1808     create_output_collection();
1809     cleanup();
1810     save_meta();
1811     exit 1;
1812   }
1813 }
1814
1815
1816 sub freeze
1817 {
1818   Log (undef, "Freeze not implemented");
1819   return;
1820 }
1821
1822
1823 sub thaw
1824 {
1825   croak ("Thaw not implemented");
1826 }
1827
1828
1829 sub freezequote
1830 {
1831   my $s = shift;
1832   $s =~ s/\\/\\\\/g;
1833   $s =~ s/\n/\\n/g;
1834   return $s;
1835 }
1836
1837
1838 sub freezeunquote
1839 {
1840   my $s = shift;
1841   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1842   return $s;
1843 }
1844
1845
1846 sub srun
1847 {
1848   my $srunargs = shift;
1849   my $execargs = shift;
1850   my $opts = shift || {};
1851   my $stdin = shift;
1852   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1853
1854   $Data::Dumper::Terse = 1;
1855   $Data::Dumper::Indent = 0;
1856   my $show_cmd = Dumper($args);
1857   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1858   $show_cmd =~ s/\n/ /g;
1859   if ($opts->{fork}) {
1860     Log(undef, "starting: $show_cmd");
1861   } else {
1862     # This is a child process: parent is in charge of reading our
1863     # stderr and copying it to Log() if needed.
1864     warn "starting: $show_cmd\n";
1865   }
1866
1867   if (defined $stdin) {
1868     my $child = open STDIN, "-|";
1869     defined $child or die "no fork: $!";
1870     if ($child == 0) {
1871       print $stdin or die $!;
1872       close STDOUT or die $!;
1873       exit 0;
1874     }
1875   }
1876
1877   return system (@$args) if $opts->{fork};
1878
1879   exec @$args;
1880   warn "ENV size is ".length(join(" ",%ENV));
1881   die "exec failed: $!: @$args";
1882 }
1883
1884
1885 sub ban_node_by_slot {
1886   # Don't start any new jobsteps on this node for 60 seconds
1887   my $slotid = shift;
1888   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1889   $slot[$slotid]->{node}->{hold_count}++;
1890   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1891 }
1892
1893 sub must_lock_now
1894 {
1895   my ($lockfile, $error_message) = @_;
1896   open L, ">", $lockfile or croak("$lockfile: $!");
1897   if (!flock L, LOCK_EX|LOCK_NB) {
1898     croak("Can't lock $lockfile: $error_message\n");
1899   }
1900 }
1901
1902 sub find_docker_image {
1903   # Given a Keep locator, check to see if it contains a Docker image.
1904   # If so, return its stream name and Docker hash.
1905   # If not, return undef for both values.
1906   my $locator = shift;
1907   my ($streamname, $filename);
1908   my $image = api_call("collections/get", uuid => $locator);
1909   if ($image) {
1910     foreach my $line (split(/\n/, $image->{manifest_text})) {
1911       my @tokens = split(/\s+/, $line);
1912       next if (!@tokens);
1913       $streamname = shift(@tokens);
1914       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1915         if (defined($filename)) {
1916           return (undef, undef);  # More than one file in the Collection.
1917         } else {
1918           $filename = (split(/:/, $filedata, 3))[2];
1919         }
1920       }
1921     }
1922   }
1923   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1924     return ($streamname, $1);
1925   } else {
1926     return (undef, undef);
1927   }
1928 }
1929
1930 sub retry_count {
1931   # Calculate the number of times an operation should be retried,
1932   # assuming exponential backoff, and that we're willing to retry as
1933   # long as tasks have been running.  Enforce a minimum of 3 retries.
1934   my ($starttime, $endtime, $timediff, $retries);
1935   if (@jobstep) {
1936     $starttime = $jobstep[0]->{starttime};
1937     $endtime = $jobstep[-1]->{finishtime};
1938   }
1939   if (!defined($starttime)) {
1940     $timediff = 0;
1941   } elsif (!defined($endtime)) {
1942     $timediff = time - $starttime;
1943   } else {
1944     $timediff = ($endtime - $starttime) - (time - $endtime);
1945   }
1946   if ($timediff > 0) {
1947     $retries = int(log($timediff) / log(2));
1948   } else {
1949     $retries = 1;  # Use the minimum.
1950   }
1951   return ($retries > 3) ? $retries : 3;
1952 }
1953
1954 sub retry_op {
1955   # Pass in two function references.
1956   # This method will be called with the remaining arguments.
1957   # If it dies, retry it with exponential backoff until it succeeds,
1958   # or until the current retry_count is exhausted.  After each failure
1959   # that can be retried, the second function will be called with
1960   # the current try count (0-based), next try time, and error message.
1961   my $operation = shift;
1962   my $retry_callback = shift;
1963   my $retries = retry_count();
1964   foreach my $try_count (0..$retries) {
1965     my $next_try = time + (2 ** $try_count);
1966     my $result = eval { $operation->(@_); };
1967     if (!$@) {
1968       return $result;
1969     } elsif ($try_count < $retries) {
1970       $retry_callback->($try_count, $next_try, $@);
1971       my $sleep_time = $next_try - time;
1972       sleep($sleep_time) if ($sleep_time > 0);
1973     }
1974   }
1975   # Ensure the error message ends in a newline, so Perl doesn't add
1976   # retry_op's line number to it.
1977   chomp($@);
1978   die($@ . "\n");
1979 }
1980
1981 sub api_call {
1982   # Pass in a /-separated API method name, and arguments for it.
1983   # This function will call that method, retrying as needed until
1984   # the current retry_count is exhausted, with a log on the first failure.
1985   my $method_name = shift;
1986   my $log_api_retry = sub {
1987     my ($try_count, $next_try_at, $errmsg) = @_;
1988     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1989     $errmsg =~ s/\s/ /g;
1990     $errmsg =~ s/\s+$//;
1991     my $retry_msg;
1992     if ($next_try_at < time) {
1993       $retry_msg = "Retrying.";
1994     } else {
1995       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1996       $retry_msg = "Retrying at $next_try_fmt.";
1997     }
1998     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1999   };
2000   my $method = $arv;
2001   foreach my $key (split(/\//, $method_name)) {
2002     $method = $method->{$key};
2003   }
2004   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2005 }
2006
2007 sub exit_status_s {
2008   # Given a $?, return a human-readable exit code string like "0" or
2009   # "1" or "0 with signal 1" or "1 with signal 11".
2010   my $exitcode = shift;
2011   my $s = $exitcode >> 8;
2012   if ($exitcode & 0x7f) {
2013     $s .= " with signal " . ($exitcode & 0x7f);
2014   }
2015   if ($exitcode & 0x80) {
2016     $s .= " with core dump";
2017   }
2018   return $s;
2019 }
2020
2021 sub handle_readall {
2022   # Pass in a glob reference to a file handle.
2023   # Read all its contents and return them as a string.
2024   my $fh_glob_ref = shift;
2025   local $/ = undef;
2026   return <$fh_glob_ref>;
2027 }
2028
2029 sub tar_filename_n {
2030   my $n = shift;
2031   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2032 }
2033
2034 sub add_git_archive {
2035   # Pass in a git archive command as a string or list, a la system().
2036   # This method will save its output to be included in the archive sent to the
2037   # build script.
2038   my $git_input;
2039   $git_tar_count++;
2040   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2041     croak("Failed to save git archive: $!");
2042   }
2043   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2044   close($git_input);
2045   waitpid($git_pid, 0);
2046   close(GIT_ARCHIVE);
2047   if ($?) {
2048     croak("Failed to save git archive: git exited " . exit_status_s($?));
2049   }
2050 }
2051
2052 sub combined_git_archive {
2053   # Combine all saved tar archives into a single archive, then return its
2054   # contents in a string.  Return undef if no archives have been saved.
2055   if ($git_tar_count < 1) {
2056     return undef;
2057   }
2058   my $base_tar_name = tar_filename_n(1);
2059   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2060     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2061     if ($tar_exit != 0) {
2062       croak("Error preparing build archive: tar -A exited " .
2063             exit_status_s($tar_exit));
2064     }
2065   }
2066   if (!open(GIT_TAR, "<", $base_tar_name)) {
2067     croak("Could not open build archive: $!");
2068   }
2069   my $tar_contents = handle_readall(\*GIT_TAR);
2070   close(GIT_TAR);
2071   return $tar_contents;
2072 }
2073
2074 sub set_nonblocking {
2075   my $fh = shift;
2076   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2077   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2078 }
2079
2080 __DATA__
2081 #!/usr/bin/env perl
2082 #
2083 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2084 # server invokes this script on individual compute nodes, or localhost if we're
2085 # running a job locally.  It gets called in two modes:
2086 #
2087 # * No arguments: Installation mode.  Read a tar archive from the DATA
2088 #   file handle; it includes the Crunch script's source code, and
2089 #   maybe SDKs as well.  Those should be installed in the proper
2090 #   locations.  This runs outside of any Docker container, so don't try to
2091 #   introspect Crunch's runtime environment.
2092 #
2093 # * With arguments: Crunch script run mode.  This script should set up the
2094 #   environment, then run the command specified in the arguments.  This runs
2095 #   inside any Docker container.
2096
2097 use Fcntl ':flock';
2098 use File::Path qw( make_path remove_tree );
2099 use POSIX qw(getcwd);
2100
2101 use constant TASK_TEMPFAIL => 111;
2102
2103 # Map SDK subdirectories to the path environments they belong to.
2104 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2105
2106 my $destdir = $ENV{"CRUNCH_SRC"};
2107 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2108 my $repo = $ENV{"CRUNCH_SRC_URL"};
2109 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2110 my $job_work = $ENV{"JOB_WORK"};
2111 my $task_work = $ENV{"TASK_WORK"};
2112
2113 open(STDOUT_ORIG, ">&", STDOUT);
2114 open(STDERR_ORIG, ">&", STDERR);
2115
2116 for my $dir ($destdir, $job_work, $task_work) {
2117   if ($dir) {
2118     make_path $dir;
2119     -e $dir or die "Failed to create temporary directory ($dir): $!";
2120   }
2121 }
2122
2123 if ($task_work) {
2124   remove_tree($task_work, {keep_root => 1});
2125 }
2126
2127 ### Crunch script run mode
2128 if (@ARGV) {
2129   # We want to do routine logging during task 0 only.  This gives the user
2130   # the information they need, but avoids repeating the information for every
2131   # task.
2132   my $Log;
2133   if ($ENV{TASK_SEQUENCE} eq "0") {
2134     $Log = sub {
2135       my $msg = shift;
2136       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2137     };
2138   } else {
2139     $Log = sub { };
2140   }
2141
2142   my $python_src = "$install_dir/python";
2143   my $venv_dir = "$job_work/.arvados.venv";
2144   my $venv_built = -e "$venv_dir/bin/activate";
2145   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2146     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2147                  "--python=python2.7", $venv_dir);
2148     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2149     $venv_built = 1;
2150     $Log->("Built Python SDK virtualenv");
2151   }
2152
2153   my $pip_bin = "pip";
2154   if ($venv_built) {
2155     $Log->("Running in Python SDK virtualenv");
2156     $pip_bin = "$venv_dir/bin/pip";
2157     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2158     @ARGV = ("/bin/sh", "-ec",
2159              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2160   } elsif (-d $python_src) {
2161     $Log->("Warning: virtualenv not found inside Docker container default " .
2162            "\$PATH. Can't install Python SDK.");
2163   }
2164
2165   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2166   if ($pkgs) {
2167     $Log->("Using Arvados SDK:");
2168     foreach my $line (split /\n/, $pkgs) {
2169       $Log->($line);
2170     }
2171   } else {
2172     $Log->("Arvados SDK packages not found");
2173   }
2174
2175   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2176     my $sdk_path = "$install_dir/$sdk_dir";
2177     if (-d $sdk_path) {
2178       if ($ENV{$sdk_envkey}) {
2179         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2180       } else {
2181         $ENV{$sdk_envkey} = $sdk_path;
2182       }
2183       $Log->("Arvados SDK added to %s", $sdk_envkey);
2184     }
2185   }
2186
2187   exec(@ARGV);
2188   die "Cannot exec `@ARGV`: $!";
2189 }
2190
2191 ### Installation mode
2192 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2193 flock L, LOCK_EX;
2194 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2195   # This exact git archive (source + arvados sdk) is already installed
2196   # here, so there's no need to reinstall it.
2197
2198   # We must consume our DATA section, though: otherwise the process
2199   # feeding it to us will get SIGPIPE.
2200   my $buf;
2201   while (read(DATA, $buf, 65536)) { }
2202
2203   exit(0);
2204 }
2205
2206 unlink "$destdir.archive_hash";
2207 mkdir $destdir;
2208
2209 do {
2210   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2211   local $SIG{PIPE} = "IGNORE";
2212   warn "Extracting archive: $archive_hash\n";
2213   # --ignore-zeros is necessary sometimes: depending on how much NUL
2214   # padding tar -A put on our combined archive (which in turn depends
2215   # on the length of the component archives) tar without
2216   # --ignore-zeros will exit before consuming stdin and cause close()
2217   # to fail on the resulting SIGPIPE.
2218   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2219     die "Error launching 'tar -xC $destdir': $!";
2220   }
2221   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2222   # get SIGPIPE.  We must feed it data incrementally.
2223   my $tar_input;
2224   while (read(DATA, $tar_input, 65536)) {
2225     print TARX $tar_input;
2226   }
2227   if(!close(TARX)) {
2228     die "'tar -xC $destdir' exited $?: $!";
2229   }
2230 };
2231
2232 mkdir $install_dir;
2233
2234 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2235 if (-d $sdk_root) {
2236   foreach my $sdk_lang (("python",
2237                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2238     if (-d "$sdk_root/$sdk_lang") {
2239       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2240         die "Failed to install $sdk_lang SDK: $!";
2241       }
2242     }
2243   }
2244 }
2245
2246 my $python_dir = "$install_dir/python";
2247 if ((-d $python_dir) and can_run("python2.7")) {
2248   open(my $egg_info_pipe, "-|",
2249        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2250   my @egg_info_errors = <$egg_info_pipe>;
2251   close($egg_info_pipe);
2252
2253   if ($?) {
2254     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2255       # egg_info apparently failed because it couldn't ask git for a build tag.
2256       # Specify no build tag.
2257       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2258       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2259       close($pysdk_cfg);
2260     } else {
2261       my $egg_info_exit = $? >> 8;
2262       foreach my $errline (@egg_info_errors) {
2263         warn $errline;
2264       }
2265       warn "python setup.py egg_info failed: exit $egg_info_exit";
2266       exit ($egg_info_exit || 1);
2267     }
2268   }
2269 }
2270
2271 # Hide messages from the install script (unless it fails: shell_or_die
2272 # will show $destdir.log in that case).
2273 open(STDOUT, ">>", "$destdir.log");
2274 open(STDERR, ">&", STDOUT);
2275
2276 if (-e "$destdir/crunch_scripts/install") {
2277     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2278 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2279     # Old version
2280     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2281 } elsif (-e "./install.sh") {
2282     shell_or_die (undef, "./install.sh", $install_dir);
2283 }
2284
2285 if ($archive_hash) {
2286     unlink "$destdir.archive_hash.new";
2287     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2288     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2289 }
2290
2291 close L;
2292
2293 sub can_run {
2294   my $command_name = shift;
2295   open(my $which, "-|", "which", $command_name);
2296   while (<$which>) { }
2297   close($which);
2298   return ($? == 0);
2299 }
2300
2301 sub shell_or_die
2302 {
2303   my $exitcode = shift;
2304
2305   if ($ENV{"DEBUG"}) {
2306     print STDERR "@_\n";
2307   }
2308   if (system (@_) != 0) {
2309     my $err = $!;
2310     my $code = $?;
2311     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2312     open STDERR, ">&STDERR_ORIG";
2313     system ("cat $destdir.log >&2");
2314     warn "@_ failed ($err): $exitstatus";
2315     if (defined($exitcode)) {
2316       exit $exitcode;
2317     }
2318     else {
2319       exit (($code >> 8) || 1);
2320     }
2321   }
2322 }
2323
2324 __DATA__