Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $local_job = JSON::decode_json($jobspec);
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if (($Job || $local_job)->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $local_job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $local_job->{'started_at'} = gmtime;
216   $local_job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $local_job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  # The number of consecutive times a task has been dispatched
297                  # to this node and failed.
298                  losing_streak => 0,
299                  # The number of consecutive times that SLURM has reported
300                  # a node failure since the last successful task.
301                  fail_count => 0,
302                  # Don't dispatch work to this node until this time
303                  # (in seconds since the epoch) has passed.
304                  hold_until => 0 };
305     foreach my $cpu (1..$ncpus)
306     {
307       push @slot, { node => $node,
308                     cpu => $cpu };
309     }
310   }
311   push @node, @nodelist;
312 }
313
314
315
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
318
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
320
321
322 $Job->update_attributes(
323   'tasks_summary' => { 'failed' => 0,
324                        'todo' => 1,
325                        'running' => 0,
326                        'done' => 0 });
327
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
336
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
342
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
346
347
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
354
355
356
357 if (defined $Job->{thawedfromkey})
358 {
359   thaw ($Job->{thawedfromkey});
360 }
361 else
362 {
363   my $first_task = api_call("job_tasks/create", job_task => {
364     'job_uuid' => $Job->{'uuid'},
365     'sequence' => 0,
366     'qsequence' => 0,
367     'parameters' => {},
368   });
369   push @jobstep, { 'level' => 0,
370                    'failures' => 0,
371                    'arvados_task' => $first_task,
372                  };
373   push @jobstep_todo, 0;
374 }
375
376
377 if (!$have_slurm)
378 {
379   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
380 }
381
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
385
386 if (!defined $no_clear_tmp) {
387   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388   Log (undef, "Clean work dirs");
389
390   my $cleanpid = fork();
391   if ($cleanpid == 0)
392   {
393     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
394     # Then clean up work directories.
395     # TODO: When #5036 is done and widely deployed, we can limit mount's
396     # -t option to simply fuse.keep.
397     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
399     exit (1);
400   }
401   while (1)
402   {
403     last if $cleanpid == waitpid (-1, WNOHANG);
404     freeze_if_want_freeze ($cleanpid);
405     select (undef, undef, undef, 0.1);
406   }
407   if ($?) {
408     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
409     exit(EX_RETRY_UNLOCKED);
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   $docker_stream =~ s/^\.//;
422   my $docker_install_script = qq{
423 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
424     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
425 fi
426 };
427   my $docker_pid = fork();
428   if ($docker_pid == 0)
429   {
430     srun (["srun", "--nodelist=" . join(',', @node)],
431           ["/bin/sh", "-ec", $docker_install_script]);
432     exit ($?);
433   }
434   while (1)
435   {
436     last if $docker_pid == waitpid (-1, WNOHANG);
437     freeze_if_want_freeze ($docker_pid);
438     select (undef, undef, undef, 0.1);
439   }
440   if ($? != 0)
441   {
442     croak("Installing Docker image from $docker_locator exited "
443           .exit_status_s($?));
444   }
445
446   # Determine whether this version of Docker supports memory+swap limits.
447   srun(["srun", "--nodelist=" . $node[0]],
448        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
449       {fork => 1});
450   $docker_limitmem = ($? == 0);
451
452   # Find a non-root Docker user to use.
453   # Tries the default user for the container, then 'crunch', then 'nobody',
454   # testing for whether the actual user id is non-zero.  This defends against
455   # mistakes but not malice, but we intend to harden the security in the future
456   # so we don't want anyone getting used to their jobs running as root in their
457   # Docker containers.
458   my @tryusers = ("", "crunch", "nobody");
459   foreach my $try_user (@tryusers) {
460     my $try_user_arg;
461     if ($try_user eq "") {
462       Log(undef, "Checking if container default user is not UID 0");
463       $try_user_arg = "";
464     } else {
465       Log(undef, "Checking if user '$try_user' is not UID 0");
466       $try_user_arg = "--user=$try_user";
467     }
468     srun(["srun", "--nodelist=" . $node[0]],
469          ["/bin/sh", "-ec",
470           "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
471           " test \$a -ne 0"],
472          {fork => 1});
473     if ($? == 0) {
474       $dockeruserarg = $try_user_arg;
475       if ($try_user eq "") {
476         Log(undef, "Container will run with default user");
477       } else {
478         Log(undef, "Container will run with $dockeruserarg");
479       }
480       last;
481     }
482   }
483
484   if (!defined $dockeruserarg) {
485     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
486   }
487
488   if ($Job->{arvados_sdk_version}) {
489     # The job also specifies an Arvados SDK version.  Add the SDKs to the
490     # tar file for the build script to install.
491     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
492                        $Job->{arvados_sdk_version}));
493     add_git_archive("git", "--git-dir=$git_dir", "archive",
494                     "--prefix=.arvados.sdk/",
495                     $Job->{arvados_sdk_version}, "sdk");
496   }
497 }
498
499 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
500   # If script_version looks like an absolute path, *and* the --git-dir
501   # argument was not given -- which implies we were not invoked by
502   # crunch-dispatch -- we will use the given path as a working
503   # directory instead of resolving script_version to a git commit (or
504   # doing anything else with git).
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
506   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
507 }
508 else {
509   # Resolve the given script_version to a git commit sha1. Also, if
510   # the repository is remote, clone it into our local filesystem: this
511   # ensures "git archive" will work, and is necessary to reliably
512   # resolve a symbolic script_version like "master^".
513   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
514
515   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
516
517   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
518
519   # If we're running under crunch-dispatch, it will have already
520   # pulled the appropriate source tree into its own repository, and
521   # given us that repo's path as $git_dir.
522   #
523   # If we're running a "local" job, we might have to fetch content
524   # from a remote repository.
525   #
526   # (Currently crunch-dispatch gives a local path with --git-dir, but
527   # we might as well accept URLs there too in case it changes its
528   # mind.)
529   my $repo = $git_dir || $Job->{'repository'};
530
531   # Repository can be remote or local. If remote, we'll need to fetch it
532   # to a local dir before doing `git log` et al.
533   my $repo_location;
534
535   if ($repo =~ m{://|^[^/]*:}) {
536     # $repo is a git url we can clone, like git:// or https:// or
537     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
538     # not recognized here because distinguishing that from a local
539     # path is too fragile. If you really need something strange here,
540     # use the ssh:// form.
541     $repo_location = 'remote';
542   } elsif ($repo =~ m{^\.*/}) {
543     # $repo is a local path to a git index. We'll also resolve ../foo
544     # to ../foo/.git if the latter is a directory. To help
545     # disambiguate local paths from named hosted repositories, this
546     # form must be given as ./ or ../ if it's a relative path.
547     if (-d "$repo/.git") {
548       $repo = "$repo/.git";
549     }
550     $repo_location = 'local';
551   } else {
552     # $repo is none of the above. It must be the name of a hosted
553     # repository.
554     my $arv_repo_list = api_call("repositories/list",
555                                  'filters' => [['name','=',$repo]]);
556     my @repos_found = @{$arv_repo_list->{'items'}};
557     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
558     if ($n_found > 0) {
559       Log(undef, "Repository '$repo' -> "
560           . join(", ", map { $_->{'uuid'} } @repos_found));
561     }
562     if ($n_found != 1) {
563       croak("Error: Found $n_found repositories with name '$repo'.");
564     }
565     $repo = $repos_found[0]->{'fetch_url'};
566     $repo_location = 'remote';
567   }
568   Log(undef, "Using $repo_location repository '$repo'");
569   $ENV{"CRUNCH_SRC_URL"} = $repo;
570
571   # Resolve given script_version (we'll call that $treeish here) to a
572   # commit sha1 ($commit).
573   my $treeish = $Job->{'script_version'};
574   my $commit;
575   if ($repo_location eq 'remote') {
576     # We minimize excess object-fetching by re-using the same bare
577     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
578     # just keep adding remotes to it as needed.
579     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
580     my $gitcmd = "git --git-dir=\Q$local_repo\E";
581
582     # Set up our local repo for caching remote objects, making
583     # archives, etc.
584     if (!-d $local_repo) {
585       make_path($local_repo) or croak("Error: could not create $local_repo");
586     }
587     # This works (exits 0 and doesn't delete fetched objects) even
588     # if $local_repo is already initialized:
589     `$gitcmd init --bare`;
590     if ($?) {
591       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
592     }
593
594     # If $treeish looks like a hash (or abbrev hash) we look it up in
595     # our local cache first, since that's cheaper. (We don't want to
596     # do that with tags/branches though -- those change over time, so
597     # they should always be resolved by the remote repo.)
598     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
599       # Hide stderr because it's normal for this to fail:
600       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
601       if ($? == 0 &&
602           # Careful not to resolve a branch named abcdeff to commit 1234567:
603           $sha1 =~ /^$treeish/ &&
604           $sha1 =~ /^([0-9a-f]{40})$/s) {
605         $commit = $1;
606         Log(undef, "Commit $commit already present in $local_repo");
607       }
608     }
609
610     if (!defined $commit) {
611       # If $treeish isn't just a hash or abbrev hash, or isn't here
612       # yet, we need to fetch the remote to resolve it correctly.
613
614       # First, remove all local heads. This prevents a name that does
615       # not exist on the remote from resolving to (or colliding with)
616       # a previously fetched branch or tag (possibly from a different
617       # remote).
618       remove_tree("$local_repo/refs/heads", {keep_root => 1});
619
620       Log(undef, "Fetching objects from $repo to $local_repo");
621       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
622       if ($?) {
623         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
624       }
625     }
626
627     # Now that the data is all here, we will use our local repo for
628     # the rest of our git activities.
629     $repo = $local_repo;
630   }
631
632   my $gitcmd = "git --git-dir=\Q$repo\E";
633   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
634   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
635     croak("`$gitcmd rev-list` exited "
636           .exit_status_s($?)
637           .", '$treeish' not found, giving up");
638   }
639   $commit = $1;
640   Log(undef, "Version $treeish is commit $commit");
641
642   if ($commit ne $Job->{'script_version'}) {
643     # Record the real commit id in the database, frozentokey, logs,
644     # etc. -- instead of an abbreviation or a branch name which can
645     # become ambiguous or point to a different commit in the future.
646     if (!$Job->update_attributes('script_version' => $commit)) {
647       croak("Error: failed to update job's script_version attribute");
648     }
649   }
650
651   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
652   add_git_archive("$gitcmd archive ''\Q$commit\E");
653 }
654
655 my $git_archive = combined_git_archive();
656 if (!defined $git_archive) {
657   Log(undef, "Skip install phase (no git archive)");
658   if ($have_slurm) {
659     Log(undef, "Warning: This probably means workers have no source tree!");
660   }
661 }
662 else {
663   my $install_exited;
664   my $install_script_tries_left = 3;
665   for (my $attempts = 0; $attempts < 3; $attempts++) {
666     Log(undef, "Run install script on all workers");
667
668     my @srunargs = ("srun",
669                     "--nodelist=$nodelist",
670                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
671     my @execargs = ("sh", "-c",
672                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
673
674     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
675     my ($install_stderr_r, $install_stderr_w);
676     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
677     set_nonblocking($install_stderr_r);
678     my $installpid = fork();
679     if ($installpid == 0)
680     {
681       close($install_stderr_r);
682       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
683       open(STDOUT, ">&", $install_stderr_w);
684       open(STDERR, ">&", $install_stderr_w);
685       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
686       exit (1);
687     }
688     close($install_stderr_w);
689     # Tell freeze_if_want_freeze how to kill the child, otherwise the
690     # "waitpid(installpid)" loop won't get interrupted by a freeze:
691     $proc{$installpid} = {};
692     my $stderr_buf = '';
693     # Track whether anything appears on stderr other than slurm errors
694     # ("srun: ...") and the "starting: ..." message printed by the
695     # srun subroutine itself:
696     my $stderr_anything_from_script = 0;
697     my $match_our_own_errors = '^(srun: error: |starting: \[)';
698     while ($installpid != waitpid(-1, WNOHANG)) {
699       freeze_if_want_freeze ($installpid);
700       # Wait up to 0.1 seconds for something to appear on stderr, then
701       # do a non-blocking read.
702       my $bits = fhbits($install_stderr_r);
703       select ($bits, undef, $bits, 0.1);
704       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
705       {
706         while ($stderr_buf =~ /^(.*?)\n/) {
707           my $line = $1;
708           substr $stderr_buf, 0, 1+length($line), "";
709           Log(undef, "stderr $line");
710           if ($line !~ /$match_our_own_errors/) {
711             $stderr_anything_from_script = 1;
712           }
713         }
714       }
715     }
716     delete $proc{$installpid};
717     $install_exited = $?;
718     close($install_stderr_r);
719     if (length($stderr_buf) > 0) {
720       if ($stderr_buf !~ /$match_our_own_errors/) {
721         $stderr_anything_from_script = 1;
722       }
723       Log(undef, "stderr $stderr_buf")
724     }
725
726     Log (undef, "Install script exited ".exit_status_s($install_exited));
727     last if $install_exited == 0 || $main::please_freeze;
728     # If the install script fails but doesn't print an error message,
729     # the next thing anyone is likely to do is just run it again in
730     # case it was a transient problem like "slurm communication fails
731     # because the network isn't reliable enough". So we'll just do
732     # that ourselves (up to 3 attempts in total). OTOH, if there is an
733     # error message, the problem is more likely to have a real fix and
734     # we should fail the job so the fixing process can start, instead
735     # of doing 2 more attempts.
736     last if $stderr_anything_from_script;
737   }
738
739   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
740     unlink($tar_filename);
741   }
742
743   if ($install_exited != 0) {
744     croak("Giving up");
745   }
746 }
747
748 foreach (qw (script script_version script_parameters runtime_constraints))
749 {
750   Log (undef,
751        "$_ " .
752        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
753 }
754 foreach (split (/\n/, $Job->{knobs}))
755 {
756   Log (undef, "knob " . $_);
757 }
758
759
760
761 $main::success = undef;
762
763
764
765 ONELEVEL:
766
767 my $thisround_succeeded = 0;
768 my $thisround_failed = 0;
769 my $thisround_failed_multiple = 0;
770 my $working_slot_count = scalar(@slot);
771
772 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
773                        or $a <=> $b } @jobstep_todo;
774 my $level = $jobstep[$jobstep_todo[0]]->{level};
775
776 my $initial_tasks_this_level = 0;
777 foreach my $id (@jobstep_todo) {
778   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
779 }
780
781 # If the number of tasks scheduled at this level #T is smaller than the number
782 # of slots available #S, only use the first #T slots, or the first slot on
783 # each node, whichever number is greater.
784 #
785 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
786 # based on these numbers.  Using fewer slots makes more resources available
787 # to each individual task, which should normally be a better strategy when
788 # there are fewer of them running with less parallelism.
789 #
790 # Note that this calculation is not redone if the initial tasks at
791 # this level queue more tasks at the same level.  This may harm
792 # overall task throughput for that level.
793 my @freeslot;
794 if ($initial_tasks_this_level < @node) {
795   @freeslot = (0..$#node);
796 } elsif ($initial_tasks_this_level < @slot) {
797   @freeslot = (0..$initial_tasks_this_level - 1);
798 } else {
799   @freeslot = (0..$#slot);
800 }
801 my $round_num_freeslots = scalar(@freeslot);
802
803 my %round_max_slots = ();
804 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
805   my $this_slot = $slot[$freeslot[$ii]];
806   my $node_name = $this_slot->{node}->{name};
807   $round_max_slots{$node_name} ||= $this_slot->{cpu};
808   last if (scalar(keys(%round_max_slots)) >= @node);
809 }
810
811 Log(undef, "start level $level with $round_num_freeslots slots");
812 my @holdslot;
813 my %reader;
814 my $progress_is_dirty = 1;
815 my $progress_stats_updated = 0;
816
817 update_progress_stats();
818
819
820 THISROUND:
821 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
822 {
823   # Don't create new tasks if we already know the job's final result.
824   last if defined($main::success);
825
826   my $id = $jobstep_todo[$todo_ptr];
827   my $Jobstep = $jobstep[$id];
828   if ($Jobstep->{level} != $level)
829   {
830     next;
831   }
832
833   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
834   set_nonblocking($reader{$id});
835
836   my $childslot = $freeslot[0];
837   my $childnode = $slot[$childslot]->{node};
838   my $childslotname = join (".",
839                             $slot[$childslot]->{node}->{name},
840                             $slot[$childslot]->{cpu});
841
842   my $childpid = fork();
843   if ($childpid == 0)
844   {
845     $SIG{'INT'} = 'DEFAULT';
846     $SIG{'QUIT'} = 'DEFAULT';
847     $SIG{'TERM'} = 'DEFAULT';
848
849     foreach (values (%reader))
850     {
851       close($_);
852     }
853     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
854     open(STDOUT,">&writer");
855     open(STDERR,">&writer");
856
857     undef $dbh;
858     undef $sth;
859
860     delete $ENV{"GNUPGHOME"};
861     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
862     $ENV{"TASK_QSEQUENCE"} = $id;
863     $ENV{"TASK_SEQUENCE"} = $level;
864     $ENV{"JOB_SCRIPT"} = $Job->{script};
865     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
866       $param =~ tr/a-z/A-Z/;
867       $ENV{"JOB_PARAMETER_$param"} = $value;
868     }
869     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
870     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
871     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
872     $ENV{"HOME"} = $ENV{"TASK_WORK"};
873     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
874     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
875     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
876     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
877
878     $ENV{"GZIP"} = "-n";
879
880     my @srunargs = (
881       "srun",
882       "--nodelist=".$childnode->{name},
883       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
884       "--job-name=$job_id.$id.$$",
885         );
886
887     my $stdbuf = " stdbuf --output=0 --error=0 ";
888
889     my $command =
890         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
891         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
892         ."&& cd $ENV{CRUNCH_TMP} "
893         # These environment variables get used explicitly later in
894         # $command.  No tool is expected to read these values directly.
895         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
896         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
897         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
898         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
899     $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
900     if ($docker_hash)
901     {
902       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
903       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
904       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
905       $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
906       # We only set memory limits if Docker lets us limit both memory and swap.
907       # Memory limits alone have been supported longer, but subprocesses tend
908       # to get SIGKILL if they exceed that without any swap limit set.
909       # See #5642 for additional background.
910       if ($docker_limitmem) {
911         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
912       }
913
914       # The source tree and $destdir directory (which we have
915       # installed on the worker host) are available in the container,
916       # under the same path.
917       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
918       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
919
920       # Currently, we make arv-mount's mount point appear at /keep
921       # inside the container (instead of using the same path as the
922       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
923       # crunch scripts and utilities must not rely on this. They must
924       # use $TASK_KEEPMOUNT.
925       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
926       $ENV{TASK_KEEPMOUNT} = "/keep";
927
928       # TASK_WORK is almost exactly like a docker data volume: it
929       # starts out empty, is writable, and persists until no
930       # containers use it any more. We don't use --volumes-from to
931       # share it with other containers: it is only accessible to this
932       # task, and it goes away when this task stops.
933       #
934       # However, a docker data volume is writable only by root unless
935       # the mount point already happens to exist in the container with
936       # different permissions. Therefore, we [1] assume /tmp already
937       # exists in the image and is writable by the crunch user; [2]
938       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
939       # writable if they are created by docker while setting up the
940       # other --volumes); and [3] create $TASK_WORK inside the
941       # container using $build_script.
942       $command .= "--volume=/tmp ";
943       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
944       $ENV{"HOME"} = $ENV{"TASK_WORK"};
945       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
946
947       # TODO: Share a single JOB_WORK volume across all task
948       # containers on a given worker node, and delete it when the job
949       # ends (and, in case that doesn't work, when the next job
950       # starts).
951       #
952       # For now, use the same approach as TASK_WORK above.
953       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
954
955       while (my ($env_key, $env_val) = each %ENV)
956       {
957         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
958           $command .= "--env=\Q$env_key=$env_val\E ";
959         }
960       }
961       $command .= "--env=\QHOME=$ENV{HOME}\E ";
962       $command .= "\Q$docker_hash\E ";
963
964       if ($Job->{arvados_sdk_version}) {
965         $command .= $stdbuf;
966         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
967       } else {
968         $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
969             "if which stdbuf >/dev/null ; then " .
970             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
971             " else " .
972             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
973             " fi\'";
974       }
975     } else {
976       # Non-docker run
977       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
978       $command .= $stdbuf;
979       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
980     }
981
982     my @execargs = ('bash', '-c', $command);
983     srun (\@srunargs, \@execargs, undef, $build_script);
984     # exec() failed, we assume nothing happened.
985     die "srun() failed on build script\n";
986   }
987   close("writer");
988   if (!defined $childpid)
989   {
990     close $reader{$id};
991     delete $reader{$id};
992     next;
993   }
994   shift @freeslot;
995   $proc{$childpid} = { jobstep => $id,
996                        time => time,
997                        slot => $childslot,
998                        jobstepname => "$job_id.$id.$childpid",
999                      };
1000   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1001   $slot[$childslot]->{pid} = $childpid;
1002
1003   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1004   Log ($id, "child $childpid started on $childslotname");
1005   $Jobstep->{starttime} = time;
1006   $Jobstep->{node} = $childnode->{name};
1007   $Jobstep->{slotindex} = $childslot;
1008   delete $Jobstep->{stderr};
1009   delete $Jobstep->{finishtime};
1010   delete $Jobstep->{tempfail};
1011
1012   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1013   $Jobstep->{'arvados_task'}->save;
1014
1015   splice @jobstep_todo, $todo_ptr, 1;
1016   --$todo_ptr;
1017
1018   $progress_is_dirty = 1;
1019
1020   while (!@freeslot
1021          ||
1022          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1023   {
1024     last THISROUND if $main::please_freeze;
1025     if ($main::please_info)
1026     {
1027       $main::please_info = 0;
1028       freeze();
1029       create_output_collection();
1030       save_meta(1);
1031       update_progress_stats();
1032     }
1033     my $gotsome
1034         = readfrompipes ()
1035         + reapchildren ();
1036     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1037     {
1038       check_refresh_wanted();
1039       check_squeue();
1040       update_progress_stats();
1041       select (undef, undef, undef, 0.1);
1042     }
1043     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1044     {
1045       update_progress_stats();
1046     }
1047     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1048                                         $_->{node}->{hold_count} < 4 } @slot);
1049     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1050         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1051     {
1052       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1053           .($thisround_failed+$thisround_succeeded)
1054           .") -- giving up on this round";
1055       Log (undef, $message);
1056       last THISROUND;
1057     }
1058
1059     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1060     for (my $i=$#freeslot; $i>=0; $i--) {
1061       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1062         push @holdslot, (splice @freeslot, $i, 1);
1063       }
1064     }
1065     for (my $i=$#holdslot; $i>=0; $i--) {
1066       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1067         push @freeslot, (splice @holdslot, $i, 1);
1068       }
1069     }
1070
1071     # give up if no nodes are succeeding
1072     if ($working_slot_count < 1) {
1073       Log(undef, "Every node has failed -- giving up");
1074       last THISROUND;
1075     }
1076   }
1077 }
1078
1079
1080 push @freeslot, splice @holdslot;
1081 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1082
1083
1084 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1085 while (%proc)
1086 {
1087   if ($main::please_continue) {
1088     $main::please_continue = 0;
1089     goto THISROUND;
1090   }
1091   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1092   readfrompipes ();
1093   if (!reapchildren())
1094   {
1095     check_refresh_wanted();
1096     check_squeue();
1097     update_progress_stats();
1098     select (undef, undef, undef, 0.1);
1099     killem (keys %proc) if $main::please_freeze;
1100   }
1101 }
1102
1103 update_progress_stats();
1104 freeze_if_want_freeze();
1105
1106
1107 if (!defined $main::success)
1108 {
1109   if (!@jobstep_todo) {
1110     $main::success = 1;
1111   } elsif ($working_slot_count < 1) {
1112     save_output_collection();
1113     save_meta();
1114     exit(EX_RETRY_UNLOCKED);
1115   } elsif ($thisround_succeeded == 0 &&
1116            ($thisround_failed == 0 || $thisround_failed > 4)) {
1117     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1118     Log (undef, $message);
1119     $main::success = 0;
1120   }
1121 }
1122
1123 goto ONELEVEL if !defined $main::success;
1124
1125
1126 release_allocation();
1127 freeze();
1128 my $collated_output = save_output_collection();
1129 Log (undef, "finish");
1130
1131 save_meta();
1132
1133 my $final_state;
1134 if ($collated_output && $main::success) {
1135   $final_state = 'Complete';
1136 } else {
1137   $final_state = 'Failed';
1138 }
1139 $Job->update_attributes('state' => $final_state);
1140
1141 exit (($final_state eq 'Complete') ? 0 : 1);
1142
1143
1144
1145 sub update_progress_stats
1146 {
1147   $progress_stats_updated = time;
1148   return if !$progress_is_dirty;
1149   my ($todo, $done, $running) = (scalar @jobstep_todo,
1150                                  scalar @jobstep_done,
1151                                  scalar keys(%proc));
1152   $Job->{'tasks_summary'} ||= {};
1153   $Job->{'tasks_summary'}->{'todo'} = $todo;
1154   $Job->{'tasks_summary'}->{'done'} = $done;
1155   $Job->{'tasks_summary'}->{'running'} = $running;
1156   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1157   Log (undef, "status: $done done, $running running, $todo todo");
1158   $progress_is_dirty = 0;
1159 }
1160
1161
1162
1163 sub reapchildren
1164 {
1165   my $pid = waitpid (-1, WNOHANG);
1166   return 0 if $pid <= 0;
1167
1168   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1169                   . "."
1170                   . $slot[$proc{$pid}->{slot}]->{cpu});
1171   my $jobstepid = $proc{$pid}->{jobstep};
1172   my $elapsed = time - $proc{$pid}->{time};
1173   my $Jobstep = $jobstep[$jobstepid];
1174
1175   my $childstatus = $?;
1176   my $exitvalue = $childstatus >> 8;
1177   my $exitinfo = "exit ".exit_status_s($childstatus);
1178   $Jobstep->{'arvados_task'}->reload;
1179   my $task_success = $Jobstep->{'arvados_task'}->{success};
1180
1181   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1182
1183   if (!defined $task_success) {
1184     # task did not indicate one way or the other --> fail
1185     $Jobstep->{'arvados_task'}->{success} = 0;
1186     $Jobstep->{'arvados_task'}->save;
1187     $task_success = 0;
1188   }
1189
1190   if (!$task_success)
1191   {
1192     my $temporary_fail;
1193     $temporary_fail ||= $Jobstep->{tempfail};
1194     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1195
1196     ++$thisround_failed;
1197     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1198
1199     # Check for signs of a failed or misconfigured node
1200     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1201         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1202       # Don't count this against jobstep failure thresholds if this
1203       # node is already suspected faulty and srun exited quickly
1204       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1205           $elapsed < 5) {
1206         Log ($jobstepid, "blaming failure on suspect node " .
1207              $slot[$proc{$pid}->{slot}]->{node}->{name});
1208         $temporary_fail ||= 1;
1209       }
1210       ban_node_by_slot($proc{$pid}->{slot});
1211     }
1212
1213     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1214                              ++$Jobstep->{'failures'},
1215                              $temporary_fail ? 'temporary' : 'permanent',
1216                              $elapsed));
1217
1218     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1219       # Give up on this task, and the whole job
1220       $main::success = 0;
1221     }
1222     # Put this task back on the todo queue
1223     push @jobstep_todo, $jobstepid;
1224     $Job->{'tasks_summary'}->{'failed'}++;
1225   }
1226   else
1227   {
1228     ++$thisround_succeeded;
1229     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1230     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1231     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1232     push @jobstep_done, $jobstepid;
1233     Log ($jobstepid, "success in $elapsed seconds");
1234   }
1235   $Jobstep->{exitcode} = $childstatus;
1236   $Jobstep->{finishtime} = time;
1237   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1238   $Jobstep->{'arvados_task'}->save;
1239   process_stderr ($jobstepid, $task_success);
1240   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1241                            length($Jobstep->{'arvados_task'}->{output}),
1242                            $Jobstep->{'arvados_task'}->{output}));
1243
1244   close $reader{$jobstepid};
1245   delete $reader{$jobstepid};
1246   delete $slot[$proc{$pid}->{slot}]->{pid};
1247   push @freeslot, $proc{$pid}->{slot};
1248   delete $proc{$pid};
1249
1250   if ($task_success) {
1251     # Load new tasks
1252     my $newtask_list = [];
1253     my $newtask_results;
1254     do {
1255       $newtask_results = api_call(
1256         "job_tasks/list",
1257         'where' => {
1258           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1259         },
1260         'order' => 'qsequence',
1261         'offset' => scalar(@$newtask_list),
1262       );
1263       push(@$newtask_list, @{$newtask_results->{items}});
1264     } while (@{$newtask_results->{items}});
1265     foreach my $arvados_task (@$newtask_list) {
1266       my $jobstep = {
1267         'level' => $arvados_task->{'sequence'},
1268         'failures' => 0,
1269         'arvados_task' => $arvados_task
1270       };
1271       push @jobstep, $jobstep;
1272       push @jobstep_todo, $#jobstep;
1273     }
1274   }
1275
1276   $progress_is_dirty = 1;
1277   1;
1278 }
1279
1280 sub check_refresh_wanted
1281 {
1282   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1283   if (@stat && $stat[9] > $latest_refresh) {
1284     $latest_refresh = scalar time;
1285     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1286     for my $attr ('cancelled_at',
1287                   'cancelled_by_user_uuid',
1288                   'cancelled_by_client_uuid',
1289                   'state') {
1290       $Job->{$attr} = $Job2->{$attr};
1291     }
1292     if ($Job->{'state'} ne "Running") {
1293       if ($Job->{'state'} eq "Cancelled") {
1294         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1295       } else {
1296         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1297       }
1298       $main::success = 0;
1299       $main::please_freeze = 1;
1300     }
1301   }
1302 }
1303
1304 sub check_squeue
1305 {
1306   my $last_squeue_check = $squeue_checked;
1307
1308   # Do not call `squeue` or check the kill list more than once every
1309   # 15 seconds.
1310   return if $last_squeue_check > time - 15;
1311   $squeue_checked = time;
1312
1313   # Look for children from which we haven't received stderr data since
1314   # the last squeue check. If no such children exist, all procs are
1315   # alive and there's no need to even look at squeue.
1316   #
1317   # As long as the crunchstat poll interval (10s) is shorter than the
1318   # squeue check interval (15s) this should make the squeue check an
1319   # infrequent event.
1320   my $silent_procs = 0;
1321   for my $jobstep (values %proc)
1322   {
1323     if ($jobstep->{stderr_at} < $last_squeue_check)
1324     {
1325       $silent_procs++;
1326     }
1327   }
1328   return if $silent_procs == 0;
1329
1330   # use killem() on procs whose killtime is reached
1331   while (my ($pid, $jobstep) = each %proc)
1332   {
1333     if (exists $jobstep->{killtime}
1334         && $jobstep->{killtime} <= time
1335         && $jobstep->{stderr_at} < $last_squeue_check)
1336     {
1337       my $sincewhen = "";
1338       if ($jobstep->{stderr_at}) {
1339         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1340       }
1341       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1342       killem ($pid);
1343     }
1344   }
1345
1346   if (!$have_slurm)
1347   {
1348     # here is an opportunity to check for mysterious problems with local procs
1349     return;
1350   }
1351
1352   # Get a list of steps still running.  Note: squeue(1) says --steps
1353   # selects a format (which we override anyway) and allows us to
1354   # specify which steps we're interested in (which we don't).
1355   # Importantly, it also changes the meaning of %j from "job name" to
1356   # "step name" and (although this isn't mentioned explicitly in the
1357   # docs) switches from "one line per job" mode to "one line per step"
1358   # mode. Without it, we'd just get a list of one job, instead of a
1359   # list of N steps.
1360   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1361   if ($? != 0)
1362   {
1363     Log(undef, "warning: squeue exit status $? ($!)");
1364     return;
1365   }
1366   chop @squeue;
1367
1368   # which of my jobsteps are running, according to squeue?
1369   my %ok;
1370   for my $jobstepname (@squeue)
1371   {
1372     $ok{$jobstepname} = 1;
1373   }
1374
1375   # Check for child procs >60s old and not mentioned by squeue.
1376   while (my ($pid, $jobstep) = each %proc)
1377   {
1378     if ($jobstep->{time} < time - 60
1379         && $jobstep->{jobstepname}
1380         && !exists $ok{$jobstep->{jobstepname}}
1381         && !exists $jobstep->{killtime})
1382     {
1383       # According to slurm, this task has ended (successfully or not)
1384       # -- but our srun child hasn't exited. First we must wait (30
1385       # seconds) in case this is just a race between communication
1386       # channels. Then, if our srun child process still hasn't
1387       # terminated, we'll conclude some slurm communication
1388       # error/delay has caused the task to die without notifying srun,
1389       # and we'll kill srun ourselves.
1390       $jobstep->{killtime} = time + 30;
1391       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1392     }
1393   }
1394 }
1395
1396
1397 sub release_allocation
1398 {
1399   if ($have_slurm)
1400   {
1401     Log (undef, "release job allocation");
1402     system "scancel $ENV{SLURM_JOB_ID}";
1403   }
1404 }
1405
1406
1407 sub readfrompipes
1408 {
1409   my $gotsome = 0;
1410   foreach my $job (keys %reader)
1411   {
1412     my $buf;
1413     while (0 < sysread ($reader{$job}, $buf, 8192))
1414     {
1415       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1416       $jobstep[$job]->{stderr_at} = time;
1417       $jobstep[$job]->{stderr} .= $buf;
1418       preprocess_stderr ($job);
1419       if (length ($jobstep[$job]->{stderr}) > 16384)
1420       {
1421         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1422       }
1423       $gotsome = 1;
1424     }
1425   }
1426   return $gotsome;
1427 }
1428
1429
1430 sub preprocess_stderr
1431 {
1432   my $job = shift;
1433
1434   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1435     my $line = $1;
1436     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1437     Log ($job, "stderr $line");
1438     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1439       # whoa.
1440       $main::please_freeze = 1;
1441     }
1442     elsif ($line =~ /srun: error: Node failure on/) {
1443       my $job_slot_index = $jobstep[$job]->{slotindex};
1444       $slot[$job_slot_index]->{node}->{fail_count}++;
1445       $jobstep[$job]->{tempfail} = 1;
1446       ban_node_by_slot($job_slot_index);
1447     }
1448     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1449       $jobstep[$job]->{tempfail} = 1;
1450       ban_node_by_slot($jobstep[$job]->{slotindex});
1451     }
1452     elsif ($line =~ /arvados\.errors\.Keep/) {
1453       $jobstep[$job]->{tempfail} = 1;
1454     }
1455   }
1456 }
1457
1458
1459 sub process_stderr
1460 {
1461   my $job = shift;
1462   my $task_success = shift;
1463   preprocess_stderr ($job);
1464
1465   map {
1466     Log ($job, "stderr $_");
1467   } split ("\n", $jobstep[$job]->{stderr});
1468 }
1469
1470 sub fetch_block
1471 {
1472   my $hash = shift;
1473   my $keep;
1474   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1475     Log(undef, "fetch_block run error from arv-get $hash: $!");
1476     return undef;
1477   }
1478   my $output_block = "";
1479   while (1) {
1480     my $buf;
1481     my $bytes = sysread($keep, $buf, 1024 * 1024);
1482     if (!defined $bytes) {
1483       Log(undef, "fetch_block read error from arv-get: $!");
1484       $output_block = undef;
1485       last;
1486     } elsif ($bytes == 0) {
1487       # sysread returns 0 at the end of the pipe.
1488       last;
1489     } else {
1490       # some bytes were read into buf.
1491       $output_block .= $buf;
1492     }
1493   }
1494   close $keep;
1495   if ($?) {
1496     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1497     $output_block = undef;
1498   }
1499   return $output_block;
1500 }
1501
1502 # Create a collection by concatenating the output of all tasks (each
1503 # task's output is either a manifest fragment, a locator for a
1504 # manifest fragment stored in Keep, or nothing at all). Return the
1505 # portable_data_hash of the new collection.
1506 sub create_output_collection
1507 {
1508   Log (undef, "collate");
1509
1510   my ($child_out, $child_in);
1511   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1512 import arvados
1513 import sys
1514 print (arvados.api("v1").collections().
1515        create(body={"manifest_text": sys.stdin.read()}).
1516        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1517 }, retry_count());
1518
1519   my $task_idx = -1;
1520   my $manifest_size = 0;
1521   for (@jobstep)
1522   {
1523     ++$task_idx;
1524     my $output = $_->{'arvados_task'}->{output};
1525     next if (!defined($output));
1526     my $next_write;
1527     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1528       $next_write = fetch_block($output);
1529     } else {
1530       $next_write = $output;
1531     }
1532     if (defined($next_write)) {
1533       if (!defined(syswrite($child_in, $next_write))) {
1534         # There's been an error writing.  Stop the loop.
1535         # We'll log details about the exit code later.
1536         last;
1537       } else {
1538         $manifest_size += length($next_write);
1539       }
1540     } else {
1541       my $uuid = $_->{'arvados_task'}->{'uuid'};
1542       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1543       $main::success = 0;
1544     }
1545   }
1546   close($child_in);
1547   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1548
1549   my $joboutput;
1550   my $s = IO::Select->new($child_out);
1551   if ($s->can_read(120)) {
1552     sysread($child_out, $joboutput, 1024 * 1024);
1553     waitpid($pid, 0);
1554     if ($?) {
1555       Log(undef, "output collection creation exited " . exit_status_s($?));
1556       $joboutput = undef;
1557     } else {
1558       chomp($joboutput);
1559     }
1560   } else {
1561     Log (undef, "timed out while creating output collection");
1562     foreach my $signal (2, 2, 2, 15, 15, 9) {
1563       kill($signal, $pid);
1564       last if waitpid($pid, WNOHANG) == -1;
1565       sleep(1);
1566     }
1567   }
1568   close($child_out);
1569
1570   return $joboutput;
1571 }
1572
1573 # Calls create_output_collection, logs the result, and returns it.
1574 # If that was successful, save that as the output in the job record.
1575 sub save_output_collection {
1576   my $collated_output = create_output_collection();
1577
1578   if (!$collated_output) {
1579     Log(undef, "Failed to write output collection");
1580   }
1581   else {
1582     Log(undef, "job output $collated_output");
1583     $Job->update_attributes('output' => $collated_output);
1584   }
1585   return $collated_output;
1586 }
1587
1588 sub killem
1589 {
1590   foreach (@_)
1591   {
1592     my $sig = 2;                # SIGINT first
1593     if (exists $proc{$_}->{"sent_$sig"} &&
1594         time - $proc{$_}->{"sent_$sig"} > 4)
1595     {
1596       $sig = 15;                # SIGTERM if SIGINT doesn't work
1597     }
1598     if (exists $proc{$_}->{"sent_$sig"} &&
1599         time - $proc{$_}->{"sent_$sig"} > 4)
1600     {
1601       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1602     }
1603     if (!exists $proc{$_}->{"sent_$sig"})
1604     {
1605       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1606       kill $sig, $_;
1607       select (undef, undef, undef, 0.1);
1608       if ($sig == 2)
1609       {
1610         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1611       }
1612       $proc{$_}->{"sent_$sig"} = time;
1613       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1614     }
1615   }
1616 }
1617
1618
1619 sub fhbits
1620 {
1621   my($bits);
1622   for (@_) {
1623     vec($bits,fileno($_),1) = 1;
1624   }
1625   $bits;
1626 }
1627
1628
1629 # Send log output to Keep via arv-put.
1630 #
1631 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1632 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1633 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1634 # $log_pipe_pid is the pid of the arv-put subprocess.
1635 #
1636 # The only functions that should access these variables directly are:
1637 #
1638 # log_writer_start($logfilename)
1639 #     Starts an arv-put pipe, reading data on stdin and writing it to
1640 #     a $logfilename file in an output collection.
1641 #
1642 # log_writer_read_output([$timeout])
1643 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1644 #     Passes $timeout to the select() call, with a default of 0.01.
1645 #     Returns the result of the last read() call on $log_pipe_out, or
1646 #     -1 if read() wasn't called because select() timed out.
1647 #     Only other log_writer_* functions should need to call this.
1648 #
1649 # log_writer_send($txt)
1650 #     Writes $txt to the output log collection.
1651 #
1652 # log_writer_finish()
1653 #     Closes the arv-put pipe and returns the output that it produces.
1654 #
1655 # log_writer_is_active()
1656 #     Returns a true value if there is currently a live arv-put
1657 #     process, false otherwise.
1658 #
1659 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1660     $log_pipe_pid);
1661
1662 sub log_writer_start($)
1663 {
1664   my $logfilename = shift;
1665   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1666                         'arv-put',
1667                         '--stream',
1668                         '--retries', '3',
1669                         '--filename', $logfilename,
1670                         '-');
1671   $log_pipe_out_buf = "";
1672   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1673 }
1674
1675 sub log_writer_read_output {
1676   my $timeout = shift || 0.01;
1677   my $read = -1;
1678   while ($read && $log_pipe_out_select->can_read($timeout)) {
1679     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1680                  length($log_pipe_out_buf));
1681   }
1682   if (!defined($read)) {
1683     Log(undef, "error reading log manifest from arv-put: $!");
1684   }
1685   return $read;
1686 }
1687
1688 sub log_writer_send($)
1689 {
1690   my $txt = shift;
1691   print $log_pipe_in $txt;
1692   log_writer_read_output();
1693 }
1694
1695 sub log_writer_finish()
1696 {
1697   return unless $log_pipe_pid;
1698
1699   close($log_pipe_in);
1700
1701   my $logger_failed = 0;
1702   my $read_result = log_writer_read_output(120);
1703   if ($read_result == -1) {
1704     $logger_failed = -1;
1705     Log (undef, "timed out reading from 'arv-put'");
1706   } elsif ($read_result != 0) {
1707     $logger_failed = -2;
1708     Log(undef, "failed to read arv-put log manifest to EOF");
1709   }
1710
1711   waitpid($log_pipe_pid, 0);
1712   if ($?) {
1713     $logger_failed ||= $?;
1714     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1715   }
1716
1717   close($log_pipe_out);
1718   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1719   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1720       $log_pipe_out_select = undef;
1721
1722   return $arv_put_output;
1723 }
1724
1725 sub log_writer_is_active() {
1726   return $log_pipe_pid;
1727 }
1728
1729 sub Log                         # ($jobstep_id, $logmessage)
1730 {
1731   if ($_[1] =~ /\n/) {
1732     for my $line (split (/\n/, $_[1])) {
1733       Log ($_[0], $line);
1734     }
1735     return;
1736   }
1737   my $fh = select STDERR; $|=1; select $fh;
1738   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1739   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1740   $message .= "\n";
1741   my $datetime;
1742   if (log_writer_is_active() || -t STDERR) {
1743     my @gmtime = gmtime;
1744     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1745                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1746   }
1747   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1748
1749   if (log_writer_is_active()) {
1750     log_writer_send($datetime . " " . $message);
1751   }
1752 }
1753
1754
1755 sub croak
1756 {
1757   my ($package, $file, $line) = caller;
1758   my $message = "@_ at $file line $line\n";
1759   Log (undef, $message);
1760   freeze() if @jobstep_todo;
1761   create_output_collection() if @jobstep_todo;
1762   cleanup();
1763   save_meta();
1764   die;
1765 }
1766
1767
1768 sub cleanup
1769 {
1770   return unless $Job;
1771   if ($Job->{'state'} eq 'Cancelled') {
1772     $Job->update_attributes('finished_at' => scalar gmtime);
1773   } else {
1774     $Job->update_attributes('state' => 'Failed');
1775   }
1776 }
1777
1778
1779 sub save_meta
1780 {
1781   my $justcheckpoint = shift; # false if this will be the last meta saved
1782   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1783   return unless log_writer_is_active();
1784   my $log_manifest = log_writer_finish();
1785   return unless defined($log_manifest);
1786
1787   if ($Job->{log}) {
1788     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1789     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1790   }
1791
1792   my $log_coll = api_call(
1793     "collections/create", ensure_unique_name => 1, collection => {
1794       manifest_text => $log_manifest,
1795       owner_uuid => $Job->{owner_uuid},
1796       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1797     });
1798   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1799   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1800 }
1801
1802
1803 sub freeze_if_want_freeze
1804 {
1805   if ($main::please_freeze)
1806   {
1807     release_allocation();
1808     if (@_)
1809     {
1810       # kill some srun procs before freeze+stop
1811       map { $proc{$_} = {} } @_;
1812       while (%proc)
1813       {
1814         killem (keys %proc);
1815         select (undef, undef, undef, 0.1);
1816         my $died;
1817         while (($died = waitpid (-1, WNOHANG)) > 0)
1818         {
1819           delete $proc{$died};
1820         }
1821       }
1822     }
1823     freeze();
1824     create_output_collection();
1825     cleanup();
1826     save_meta();
1827     exit 1;
1828   }
1829 }
1830
1831
1832 sub freeze
1833 {
1834   Log (undef, "Freeze not implemented");
1835   return;
1836 }
1837
1838
1839 sub thaw
1840 {
1841   croak ("Thaw not implemented");
1842 }
1843
1844
1845 sub freezequote
1846 {
1847   my $s = shift;
1848   $s =~ s/\\/\\\\/g;
1849   $s =~ s/\n/\\n/g;
1850   return $s;
1851 }
1852
1853
1854 sub freezeunquote
1855 {
1856   my $s = shift;
1857   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1858   return $s;
1859 }
1860
1861
1862 sub srun
1863 {
1864   my $srunargs = shift;
1865   my $execargs = shift;
1866   my $opts = shift || {};
1867   my $stdin = shift;
1868   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1869
1870   $Data::Dumper::Terse = 1;
1871   $Data::Dumper::Indent = 0;
1872   my $show_cmd = Dumper($args);
1873   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1874   $show_cmd =~ s/\n/ /g;
1875   if ($opts->{fork}) {
1876     Log(undef, "starting: $show_cmd");
1877   } else {
1878     # This is a child process: parent is in charge of reading our
1879     # stderr and copying it to Log() if needed.
1880     warn "starting: $show_cmd\n";
1881   }
1882
1883   if (defined $stdin) {
1884     my $child = open STDIN, "-|";
1885     defined $child or die "no fork: $!";
1886     if ($child == 0) {
1887       print $stdin or die $!;
1888       close STDOUT or die $!;
1889       exit 0;
1890     }
1891   }
1892
1893   return system (@$args) if $opts->{fork};
1894
1895   exec @$args;
1896   warn "ENV size is ".length(join(" ",%ENV));
1897   die "exec failed: $!: @$args";
1898 }
1899
1900
1901 sub ban_node_by_slot {
1902   # Don't start any new jobsteps on this node for 60 seconds
1903   my $slotid = shift;
1904   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1905   $slot[$slotid]->{node}->{hold_count}++;
1906   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1907 }
1908
1909 sub must_lock_now
1910 {
1911   my ($lockfile, $error_message) = @_;
1912   open L, ">", $lockfile or croak("$lockfile: $!");
1913   if (!flock L, LOCK_EX|LOCK_NB) {
1914     croak("Can't lock $lockfile: $error_message\n");
1915   }
1916 }
1917
1918 sub find_docker_image {
1919   # Given a Keep locator, check to see if it contains a Docker image.
1920   # If so, return its stream name and Docker hash.
1921   # If not, return undef for both values.
1922   my $locator = shift;
1923   my ($streamname, $filename);
1924   my $image = api_call("collections/get", uuid => $locator);
1925   if ($image) {
1926     foreach my $line (split(/\n/, $image->{manifest_text})) {
1927       my @tokens = split(/\s+/, $line);
1928       next if (!@tokens);
1929       $streamname = shift(@tokens);
1930       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1931         if (defined($filename)) {
1932           return (undef, undef);  # More than one file in the Collection.
1933         } else {
1934           $filename = (split(/:/, $filedata, 3))[2];
1935         }
1936       }
1937     }
1938   }
1939   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1940     return ($streamname, $1);
1941   } else {
1942     return (undef, undef);
1943   }
1944 }
1945
1946 sub retry_count {
1947   # Calculate the number of times an operation should be retried,
1948   # assuming exponential backoff, and that we're willing to retry as
1949   # long as tasks have been running.  Enforce a minimum of 3 retries.
1950   my ($starttime, $endtime, $timediff, $retries);
1951   if (@jobstep) {
1952     $starttime = $jobstep[0]->{starttime};
1953     $endtime = $jobstep[-1]->{finishtime};
1954   }
1955   if (!defined($starttime)) {
1956     $timediff = 0;
1957   } elsif (!defined($endtime)) {
1958     $timediff = time - $starttime;
1959   } else {
1960     $timediff = ($endtime - $starttime) - (time - $endtime);
1961   }
1962   if ($timediff > 0) {
1963     $retries = int(log($timediff) / log(2));
1964   } else {
1965     $retries = 1;  # Use the minimum.
1966   }
1967   return ($retries > 3) ? $retries : 3;
1968 }
1969
1970 sub retry_op {
1971   # Pass in two function references.
1972   # This method will be called with the remaining arguments.
1973   # If it dies, retry it with exponential backoff until it succeeds,
1974   # or until the current retry_count is exhausted.  After each failure
1975   # that can be retried, the second function will be called with
1976   # the current try count (0-based), next try time, and error message.
1977   my $operation = shift;
1978   my $retry_callback = shift;
1979   my $retries = retry_count();
1980   foreach my $try_count (0..$retries) {
1981     my $next_try = time + (2 ** $try_count);
1982     my $result = eval { $operation->(@_); };
1983     if (!$@) {
1984       return $result;
1985     } elsif ($try_count < $retries) {
1986       $retry_callback->($try_count, $next_try, $@);
1987       my $sleep_time = $next_try - time;
1988       sleep($sleep_time) if ($sleep_time > 0);
1989     }
1990   }
1991   # Ensure the error message ends in a newline, so Perl doesn't add
1992   # retry_op's line number to it.
1993   chomp($@);
1994   die($@ . "\n");
1995 }
1996
1997 sub api_call {
1998   # Pass in a /-separated API method name, and arguments for it.
1999   # This function will call that method, retrying as needed until
2000   # the current retry_count is exhausted, with a log on the first failure.
2001   my $method_name = shift;
2002   my $log_api_retry = sub {
2003     my ($try_count, $next_try_at, $errmsg) = @_;
2004     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2005     $errmsg =~ s/\s/ /g;
2006     $errmsg =~ s/\s+$//;
2007     my $retry_msg;
2008     if ($next_try_at < time) {
2009       $retry_msg = "Retrying.";
2010     } else {
2011       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2012       $retry_msg = "Retrying at $next_try_fmt.";
2013     }
2014     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2015   };
2016   my $method = $arv;
2017   foreach my $key (split(/\//, $method_name)) {
2018     $method = $method->{$key};
2019   }
2020   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2021 }
2022
2023 sub exit_status_s {
2024   # Given a $?, return a human-readable exit code string like "0" or
2025   # "1" or "0 with signal 1" or "1 with signal 11".
2026   my $exitcode = shift;
2027   my $s = $exitcode >> 8;
2028   if ($exitcode & 0x7f) {
2029     $s .= " with signal " . ($exitcode & 0x7f);
2030   }
2031   if ($exitcode & 0x80) {
2032     $s .= " with core dump";
2033   }
2034   return $s;
2035 }
2036
2037 sub handle_readall {
2038   # Pass in a glob reference to a file handle.
2039   # Read all its contents and return them as a string.
2040   my $fh_glob_ref = shift;
2041   local $/ = undef;
2042   return <$fh_glob_ref>;
2043 }
2044
2045 sub tar_filename_n {
2046   my $n = shift;
2047   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2048 }
2049
2050 sub add_git_archive {
2051   # Pass in a git archive command as a string or list, a la system().
2052   # This method will save its output to be included in the archive sent to the
2053   # build script.
2054   my $git_input;
2055   $git_tar_count++;
2056   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2057     croak("Failed to save git archive: $!");
2058   }
2059   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2060   close($git_input);
2061   waitpid($git_pid, 0);
2062   close(GIT_ARCHIVE);
2063   if ($?) {
2064     croak("Failed to save git archive: git exited " . exit_status_s($?));
2065   }
2066 }
2067
2068 sub combined_git_archive {
2069   # Combine all saved tar archives into a single archive, then return its
2070   # contents in a string.  Return undef if no archives have been saved.
2071   if ($git_tar_count < 1) {
2072     return undef;
2073   }
2074   my $base_tar_name = tar_filename_n(1);
2075   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2076     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2077     if ($tar_exit != 0) {
2078       croak("Error preparing build archive: tar -A exited " .
2079             exit_status_s($tar_exit));
2080     }
2081   }
2082   if (!open(GIT_TAR, "<", $base_tar_name)) {
2083     croak("Could not open build archive: $!");
2084   }
2085   my $tar_contents = handle_readall(\*GIT_TAR);
2086   close(GIT_TAR);
2087   return $tar_contents;
2088 }
2089
2090 sub set_nonblocking {
2091   my $fh = shift;
2092   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2093   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2094 }
2095
2096 __DATA__
2097 #!/usr/bin/env perl
2098 #
2099 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2100 # server invokes this script on individual compute nodes, or localhost if we're
2101 # running a job locally.  It gets called in two modes:
2102 #
2103 # * No arguments: Installation mode.  Read a tar archive from the DATA
2104 #   file handle; it includes the Crunch script's source code, and
2105 #   maybe SDKs as well.  Those should be installed in the proper
2106 #   locations.  This runs outside of any Docker container, so don't try to
2107 #   introspect Crunch's runtime environment.
2108 #
2109 # * With arguments: Crunch script run mode.  This script should set up the
2110 #   environment, then run the command specified in the arguments.  This runs
2111 #   inside any Docker container.
2112
2113 use Fcntl ':flock';
2114 use File::Path qw( make_path remove_tree );
2115 use POSIX qw(getcwd);
2116
2117 use constant TASK_TEMPFAIL => 111;
2118
2119 # Map SDK subdirectories to the path environments they belong to.
2120 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2121
2122 my $destdir = $ENV{"CRUNCH_SRC"};
2123 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2124 my $repo = $ENV{"CRUNCH_SRC_URL"};
2125 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2126 my $job_work = $ENV{"JOB_WORK"};
2127 my $task_work = $ENV{"TASK_WORK"};
2128
2129 open(STDOUT_ORIG, ">&", STDOUT);
2130 open(STDERR_ORIG, ">&", STDERR);
2131
2132 for my $dir ($destdir, $job_work, $task_work) {
2133   if ($dir) {
2134     make_path $dir;
2135     -e $dir or die "Failed to create temporary directory ($dir): $!";
2136   }
2137 }
2138
2139 if ($task_work) {
2140   remove_tree($task_work, {keep_root => 1});
2141 }
2142
2143 ### Crunch script run mode
2144 if (@ARGV) {
2145   # We want to do routine logging during task 0 only.  This gives the user
2146   # the information they need, but avoids repeating the information for every
2147   # task.
2148   my $Log;
2149   if ($ENV{TASK_SEQUENCE} eq "0") {
2150     $Log = sub {
2151       my $msg = shift;
2152       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2153     };
2154   } else {
2155     $Log = sub { };
2156   }
2157
2158   my $python_src = "$install_dir/python";
2159   my $venv_dir = "$job_work/.arvados.venv";
2160   my $venv_built = -e "$venv_dir/bin/activate";
2161   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2162     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2163                  "--python=python2.7", $venv_dir);
2164     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2165     $venv_built = 1;
2166     $Log->("Built Python SDK virtualenv");
2167   }
2168
2169   my $pip_bin = "pip";
2170   if ($venv_built) {
2171     $Log->("Running in Python SDK virtualenv");
2172     $pip_bin = "$venv_dir/bin/pip";
2173     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2174     @ARGV = ("/bin/sh", "-ec",
2175              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2176   } elsif (-d $python_src) {
2177     $Log->("Warning: virtualenv not found inside Docker container default " .
2178            "\$PATH. Can't install Python SDK.");
2179   }
2180
2181   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2182   if ($pkgs) {
2183     $Log->("Using Arvados SDK:");
2184     foreach my $line (split /\n/, $pkgs) {
2185       $Log->($line);
2186     }
2187   } else {
2188     $Log->("Arvados SDK packages not found");
2189   }
2190
2191   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2192     my $sdk_path = "$install_dir/$sdk_dir";
2193     if (-d $sdk_path) {
2194       if ($ENV{$sdk_envkey}) {
2195         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2196       } else {
2197         $ENV{$sdk_envkey} = $sdk_path;
2198       }
2199       $Log->("Arvados SDK added to %s", $sdk_envkey);
2200     }
2201   }
2202
2203   exec(@ARGV);
2204   die "Cannot exec `@ARGV`: $!";
2205 }
2206
2207 ### Installation mode
2208 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2209 flock L, LOCK_EX;
2210 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2211   # This exact git archive (source + arvados sdk) is already installed
2212   # here, so there's no need to reinstall it.
2213
2214   # We must consume our DATA section, though: otherwise the process
2215   # feeding it to us will get SIGPIPE.
2216   my $buf;
2217   while (read(DATA, $buf, 65536)) { }
2218
2219   exit(0);
2220 }
2221
2222 unlink "$destdir.archive_hash";
2223 mkdir $destdir;
2224
2225 do {
2226   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2227   local $SIG{PIPE} = "IGNORE";
2228   warn "Extracting archive: $archive_hash\n";
2229   # --ignore-zeros is necessary sometimes: depending on how much NUL
2230   # padding tar -A put on our combined archive (which in turn depends
2231   # on the length of the component archives) tar without
2232   # --ignore-zeros will exit before consuming stdin and cause close()
2233   # to fail on the resulting SIGPIPE.
2234   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2235     die "Error launching 'tar -xC $destdir': $!";
2236   }
2237   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2238   # get SIGPIPE.  We must feed it data incrementally.
2239   my $tar_input;
2240   while (read(DATA, $tar_input, 65536)) {
2241     print TARX $tar_input;
2242   }
2243   if(!close(TARX)) {
2244     die "'tar -xC $destdir' exited $?: $!";
2245   }
2246 };
2247
2248 mkdir $install_dir;
2249
2250 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2251 if (-d $sdk_root) {
2252   foreach my $sdk_lang (("python",
2253                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2254     if (-d "$sdk_root/$sdk_lang") {
2255       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2256         die "Failed to install $sdk_lang SDK: $!";
2257       }
2258     }
2259   }
2260 }
2261
2262 my $python_dir = "$install_dir/python";
2263 if ((-d $python_dir) and can_run("python2.7")) {
2264   open(my $egg_info_pipe, "-|",
2265        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2266   my @egg_info_errors = <$egg_info_pipe>;
2267   close($egg_info_pipe);
2268
2269   if ($?) {
2270     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2271       # egg_info apparently failed because it couldn't ask git for a build tag.
2272       # Specify no build tag.
2273       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2274       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2275       close($pysdk_cfg);
2276     } else {
2277       my $egg_info_exit = $? >> 8;
2278       foreach my $errline (@egg_info_errors) {
2279         warn $errline;
2280       }
2281       warn "python setup.py egg_info failed: exit $egg_info_exit";
2282       exit ($egg_info_exit || 1);
2283     }
2284   }
2285 }
2286
2287 # Hide messages from the install script (unless it fails: shell_or_die
2288 # will show $destdir.log in that case).
2289 open(STDOUT, ">>", "$destdir.log");
2290 open(STDERR, ">&", STDOUT);
2291
2292 if (-e "$destdir/crunch_scripts/install") {
2293     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2294 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2295     # Old version
2296     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2297 } elsif (-e "./install.sh") {
2298     shell_or_die (undef, "./install.sh", $install_dir);
2299 }
2300
2301 if ($archive_hash) {
2302     unlink "$destdir.archive_hash.new";
2303     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2304     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2305 }
2306
2307 close L;
2308
2309 sub can_run {
2310   my $command_name = shift;
2311   open(my $which, "-|", "which", $command_name);
2312   while (<$which>) { }
2313   close($which);
2314   return ($? == 0);
2315 }
2316
2317 sub shell_or_die
2318 {
2319   my $exitcode = shift;
2320
2321   if ($ENV{"DEBUG"}) {
2322     print STDERR "@_\n";
2323   }
2324   if (system (@_) != 0) {
2325     my $err = $!;
2326     my $code = $?;
2327     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2328     open STDERR, ">&STDERR_ORIG";
2329     system ("cat $destdir.log >&2");
2330     warn "@_ failed ($err): $exitstatus";
2331     if (defined($exitcode)) {
2332       exit $exitcode;
2333     }
2334     else {
2335       exit (($code >> 8) || 1);
2336     }
2337   }
2338 }
2339
2340 __DATA__