7661: add --by-pdh option to FUSE and use this option in crunch-job. Do not start...
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $local_job = JSON::decode_json($jobspec);
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if (($Job || $local_job)->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $local_job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $local_job->{'started_at'} = gmtime;
216   $local_job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $local_job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  # The number of consecutive times a task has been dispatched
297                  # to this node and failed.
298                  losing_streak => 0,
299                  # The number of consecutive times that SLURM has reported
300                  # a node failure since the last successful task.
301                  fail_count => 0,
302                  # Don't dispatch work to this node until this time
303                  # (in seconds since the epoch) has passed.
304                  hold_until => 0 };
305     foreach my $cpu (1..$ncpus)
306     {
307       push @slot, { node => $node,
308                     cpu => $cpu };
309     }
310   }
311   push @node, @nodelist;
312 }
313
314
315
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
318
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
320
321
322 $Job->update_attributes(
323   'tasks_summary' => { 'failed' => 0,
324                        'todo' => 1,
325                        'running' => 0,
326                        'done' => 0 });
327
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
336
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
342
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
346
347
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
354
355
356
357 if (defined $Job->{thawedfromkey})
358 {
359   thaw ($Job->{thawedfromkey});
360 }
361 else
362 {
363   my $first_task = api_call("job_tasks/create", job_task => {
364     'job_uuid' => $Job->{'uuid'},
365     'sequence' => 0,
366     'qsequence' => 0,
367     'parameters' => {},
368   });
369   push @jobstep, { 'level' => 0,
370                    'failures' => 0,
371                    'arvados_task' => $first_task,
372                  };
373   push @jobstep_todo, 0;
374 }
375
376
377 if (!$have_slurm)
378 {
379   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
380 }
381
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
385
386 if (!defined $no_clear_tmp) {
387   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388   Log (undef, "Clean work dirs");
389
390   my $cleanpid = fork();
391   if ($cleanpid == 0)
392   {
393     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
394     # Then clean up work directories.
395     # TODO: When #5036 is done and widely deployed, we can limit mount's
396     # -t option to simply fuse.keep.
397     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
399     exit (1);
400   }
401   while (1)
402   {
403     last if $cleanpid == waitpid (-1, WNOHANG);
404     freeze_if_want_freeze ($cleanpid);
405     select (undef, undef, undef, 0.1);
406   }
407   if ($?) {
408     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
409     exit(EX_RETRY_UNLOCKED);
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   $docker_stream =~ s/^\.//;
422   my $docker_install_script = qq{
423 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
424     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
425 fi
426 };
427   my $docker_pid = fork();
428   if ($docker_pid == 0)
429   {
430     srun (["srun", "--nodelist=" . join(',', @node)],
431           ["/bin/sh", "-ec", $docker_install_script]);
432     exit ($?);
433   }
434   while (1)
435   {
436     last if $docker_pid == waitpid (-1, WNOHANG);
437     freeze_if_want_freeze ($docker_pid);
438     select (undef, undef, undef, 0.1);
439   }
440   if ($? != 0)
441   {
442     croak("Installing Docker image from $docker_locator exited "
443           .exit_status_s($?));
444   }
445
446   # Determine whether this version of Docker supports memory+swap limits.
447   srun(["srun", "--nodelist=" . $node[0]],
448        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
449       {fork => 1});
450   $docker_limitmem = ($? == 0);
451
452   # Find a non-root Docker user to use.
453   # Tries the default user for the container, then 'crunch', then 'nobody',
454   # testing for whether the actual user id is non-zero.  This defends against
455   # mistakes but not malice, but we intend to harden the security in the future
456   # so we don't want anyone getting used to their jobs running as root in their
457   # Docker containers.
458   my @tryusers = ("", "crunch", "nobody");
459   foreach my $try_user (@tryusers) {
460     my $try_user_arg;
461     if ($try_user eq "") {
462       Log(undef, "Checking if container default user is not UID 0");
463       $try_user_arg = "";
464     } else {
465       Log(undef, "Checking if user '$try_user' is not UID 0");
466       $try_user_arg = "--user=$try_user";
467     }
468     srun(["srun", "--nodelist=" . $node[0]],
469          ["/bin/sh", "-ec",
470           "a=`$docker_bin run --rm $try_user_arg $docker_hash id --user` && " .
471           " test \$a -ne 0"],
472          {fork => 1});
473     if ($? == 0) {
474       $dockeruserarg = $try_user_arg;
475       if ($try_user eq "") {
476         Log(undef, "Container will run with default user");
477       } else {
478         Log(undef, "Container will run with $dockeruserarg");
479       }
480       last;
481     }
482   }
483
484   if (!defined $dockeruserarg) {
485     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
486   }
487
488   if ($Job->{arvados_sdk_version}) {
489     # The job also specifies an Arvados SDK version.  Add the SDKs to the
490     # tar file for the build script to install.
491     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
492                        $Job->{arvados_sdk_version}));
493     add_git_archive("git", "--git-dir=$git_dir", "archive",
494                     "--prefix=.arvados.sdk/",
495                     $Job->{arvados_sdk_version}, "sdk");
496   }
497 }
498
499 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
500   # If script_version looks like an absolute path, *and* the --git-dir
501   # argument was not given -- which implies we were not invoked by
502   # crunch-dispatch -- we will use the given path as a working
503   # directory instead of resolving script_version to a git commit (or
504   # doing anything else with git).
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
506   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
507 }
508 else {
509   # Resolve the given script_version to a git commit sha1. Also, if
510   # the repository is remote, clone it into our local filesystem: this
511   # ensures "git archive" will work, and is necessary to reliably
512   # resolve a symbolic script_version like "master^".
513   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
514
515   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
516
517   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
518
519   # If we're running under crunch-dispatch, it will have already
520   # pulled the appropriate source tree into its own repository, and
521   # given us that repo's path as $git_dir.
522   #
523   # If we're running a "local" job, we might have to fetch content
524   # from a remote repository.
525   #
526   # (Currently crunch-dispatch gives a local path with --git-dir, but
527   # we might as well accept URLs there too in case it changes its
528   # mind.)
529   my $repo = $git_dir || $Job->{'repository'};
530
531   # Repository can be remote or local. If remote, we'll need to fetch it
532   # to a local dir before doing `git log` et al.
533   my $repo_location;
534
535   if ($repo =~ m{://|^[^/]*:}) {
536     # $repo is a git url we can clone, like git:// or https:// or
537     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
538     # not recognized here because distinguishing that from a local
539     # path is too fragile. If you really need something strange here,
540     # use the ssh:// form.
541     $repo_location = 'remote';
542   } elsif ($repo =~ m{^\.*/}) {
543     # $repo is a local path to a git index. We'll also resolve ../foo
544     # to ../foo/.git if the latter is a directory. To help
545     # disambiguate local paths from named hosted repositories, this
546     # form must be given as ./ or ../ if it's a relative path.
547     if (-d "$repo/.git") {
548       $repo = "$repo/.git";
549     }
550     $repo_location = 'local';
551   } else {
552     # $repo is none of the above. It must be the name of a hosted
553     # repository.
554     my $arv_repo_list = api_call("repositories/list",
555                                  'filters' => [['name','=',$repo]]);
556     my @repos_found = @{$arv_repo_list->{'items'}};
557     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
558     if ($n_found > 0) {
559       Log(undef, "Repository '$repo' -> "
560           . join(", ", map { $_->{'uuid'} } @repos_found));
561     }
562     if ($n_found != 1) {
563       croak("Error: Found $n_found repositories with name '$repo'.");
564     }
565     $repo = $repos_found[0]->{'fetch_url'};
566     $repo_location = 'remote';
567   }
568   Log(undef, "Using $repo_location repository '$repo'");
569   $ENV{"CRUNCH_SRC_URL"} = $repo;
570
571   # Resolve given script_version (we'll call that $treeish here) to a
572   # commit sha1 ($commit).
573   my $treeish = $Job->{'script_version'};
574   my $commit;
575   if ($repo_location eq 'remote') {
576     # We minimize excess object-fetching by re-using the same bare
577     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
578     # just keep adding remotes to it as needed.
579     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
580     my $gitcmd = "git --git-dir=\Q$local_repo\E";
581
582     # Set up our local repo for caching remote objects, making
583     # archives, etc.
584     if (!-d $local_repo) {
585       make_path($local_repo) or croak("Error: could not create $local_repo");
586     }
587     # This works (exits 0 and doesn't delete fetched objects) even
588     # if $local_repo is already initialized:
589     `$gitcmd init --bare`;
590     if ($?) {
591       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
592     }
593
594     # If $treeish looks like a hash (or abbrev hash) we look it up in
595     # our local cache first, since that's cheaper. (We don't want to
596     # do that with tags/branches though -- those change over time, so
597     # they should always be resolved by the remote repo.)
598     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
599       # Hide stderr because it's normal for this to fail:
600       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
601       if ($? == 0 &&
602           # Careful not to resolve a branch named abcdeff to commit 1234567:
603           $sha1 =~ /^$treeish/ &&
604           $sha1 =~ /^([0-9a-f]{40})$/s) {
605         $commit = $1;
606         Log(undef, "Commit $commit already present in $local_repo");
607       }
608     }
609
610     if (!defined $commit) {
611       # If $treeish isn't just a hash or abbrev hash, or isn't here
612       # yet, we need to fetch the remote to resolve it correctly.
613
614       # First, remove all local heads. This prevents a name that does
615       # not exist on the remote from resolving to (or colliding with)
616       # a previously fetched branch or tag (possibly from a different
617       # remote).
618       remove_tree("$local_repo/refs/heads", {keep_root => 1});
619
620       Log(undef, "Fetching objects from $repo to $local_repo");
621       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
622       if ($?) {
623         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
624       }
625     }
626
627     # Now that the data is all here, we will use our local repo for
628     # the rest of our git activities.
629     $repo = $local_repo;
630   }
631
632   my $gitcmd = "git --git-dir=\Q$repo\E";
633   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
634   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
635     croak("`$gitcmd rev-list` exited "
636           .exit_status_s($?)
637           .", '$treeish' not found, giving up");
638   }
639   $commit = $1;
640   Log(undef, "Version $treeish is commit $commit");
641
642   if ($commit ne $Job->{'script_version'}) {
643     # Record the real commit id in the database, frozentokey, logs,
644     # etc. -- instead of an abbreviation or a branch name which can
645     # become ambiguous or point to a different commit in the future.
646     if (!$Job->update_attributes('script_version' => $commit)) {
647       croak("Error: failed to update job's script_version attribute");
648     }
649   }
650
651   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
652   add_git_archive("$gitcmd archive ''\Q$commit\E");
653 }
654
655 my $git_archive = combined_git_archive();
656 if (!defined $git_archive) {
657   Log(undef, "Skip install phase (no git archive)");
658   if ($have_slurm) {
659     Log(undef, "Warning: This probably means workers have no source tree!");
660   }
661 }
662 else {
663   my $install_exited;
664   my $install_script_tries_left = 3;
665   for (my $attempts = 0; $attempts < 3; $attempts++) {
666     Log(undef, "Run install script on all workers");
667
668     my @srunargs = ("srun",
669                     "--nodelist=$nodelist",
670                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
671     my @execargs = ("sh", "-c",
672                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
673
674     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
675     my ($install_stderr_r, $install_stderr_w);
676     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
677     set_nonblocking($install_stderr_r);
678     my $installpid = fork();
679     if ($installpid == 0)
680     {
681       close($install_stderr_r);
682       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
683       open(STDOUT, ">&", $install_stderr_w);
684       open(STDERR, ">&", $install_stderr_w);
685       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
686       exit (1);
687     }
688     close($install_stderr_w);
689     # Tell freeze_if_want_freeze how to kill the child, otherwise the
690     # "waitpid(installpid)" loop won't get interrupted by a freeze:
691     $proc{$installpid} = {};
692     my $stderr_buf = '';
693     # Track whether anything appears on stderr other than slurm errors
694     # ("srun: ...") and the "starting: ..." message printed by the
695     # srun subroutine itself:
696     my $stderr_anything_from_script = 0;
697     my $match_our_own_errors = '^(srun: error: |starting: \[)';
698     while ($installpid != waitpid(-1, WNOHANG)) {
699       freeze_if_want_freeze ($installpid);
700       # Wait up to 0.1 seconds for something to appear on stderr, then
701       # do a non-blocking read.
702       my $bits = fhbits($install_stderr_r);
703       select ($bits, undef, $bits, 0.1);
704       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
705       {
706         while ($stderr_buf =~ /^(.*?)\n/) {
707           my $line = $1;
708           substr $stderr_buf, 0, 1+length($line), "";
709           Log(undef, "stderr $line");
710           if ($line !~ /$match_our_own_errors/) {
711             $stderr_anything_from_script = 1;
712           }
713         }
714       }
715     }
716     delete $proc{$installpid};
717     $install_exited = $?;
718     close($install_stderr_r);
719     if (length($stderr_buf) > 0) {
720       if ($stderr_buf !~ /$match_our_own_errors/) {
721         $stderr_anything_from_script = 1;
722       }
723       Log(undef, "stderr $stderr_buf")
724     }
725
726     Log (undef, "Install script exited ".exit_status_s($install_exited));
727     last if $install_exited == 0 || $main::please_freeze;
728     # If the install script fails but doesn't print an error message,
729     # the next thing anyone is likely to do is just run it again in
730     # case it was a transient problem like "slurm communication fails
731     # because the network isn't reliable enough". So we'll just do
732     # that ourselves (up to 3 attempts in total). OTOH, if there is an
733     # error message, the problem is more likely to have a real fix and
734     # we should fail the job so the fixing process can start, instead
735     # of doing 2 more attempts.
736     last if $stderr_anything_from_script;
737   }
738
739   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
740     unlink($tar_filename);
741   }
742
743   if ($install_exited != 0) {
744     croak("Giving up");
745   }
746 }
747
748 foreach (qw (script script_version script_parameters runtime_constraints))
749 {
750   Log (undef,
751        "$_ " .
752        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
753 }
754 foreach (split (/\n/, $Job->{knobs}))
755 {
756   Log (undef, "knob " . $_);
757 }
758
759
760
761 $main::success = undef;
762
763
764
765 ONELEVEL:
766
767 my $thisround_succeeded = 0;
768 my $thisround_failed = 0;
769 my $thisround_failed_multiple = 0;
770 my $working_slot_count = scalar(@slot);
771
772 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
773                        or $a <=> $b } @jobstep_todo;
774 my $level = $jobstep[$jobstep_todo[0]]->{level};
775
776 my $initial_tasks_this_level = 0;
777 foreach my $id (@jobstep_todo) {
778   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
779 }
780
781 # If the number of tasks scheduled at this level #T is smaller than the number
782 # of slots available #S, only use the first #T slots, or the first slot on
783 # each node, whichever number is greater.
784 #
785 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
786 # based on these numbers.  Using fewer slots makes more resources available
787 # to each individual task, which should normally be a better strategy when
788 # there are fewer of them running with less parallelism.
789 #
790 # Note that this calculation is not redone if the initial tasks at
791 # this level queue more tasks at the same level.  This may harm
792 # overall task throughput for that level.
793 my @freeslot;
794 if ($initial_tasks_this_level < @node) {
795   @freeslot = (0..$#node);
796 } elsif ($initial_tasks_this_level < @slot) {
797   @freeslot = (0..$initial_tasks_this_level - 1);
798 } else {
799   @freeslot = (0..$#slot);
800 }
801 my $round_num_freeslots = scalar(@freeslot);
802
803 my %round_max_slots = ();
804 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
805   my $this_slot = $slot[$freeslot[$ii]];
806   my $node_name = $this_slot->{node}->{name};
807   $round_max_slots{$node_name} ||= $this_slot->{cpu};
808   last if (scalar(keys(%round_max_slots)) >= @node);
809 }
810
811 Log(undef, "start level $level with $round_num_freeslots slots");
812 my @holdslot;
813 my %reader;
814 my $progress_is_dirty = 1;
815 my $progress_stats_updated = 0;
816
817 update_progress_stats();
818
819
820 THISROUND:
821 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
822 {
823   my $id = $jobstep_todo[$todo_ptr];
824   my $Jobstep = $jobstep[$id];
825   if ($Jobstep->{level} != $level)
826   {
827     next;
828   }
829
830   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
831   set_nonblocking($reader{$id});
832
833   my $childslot = $freeslot[0];
834   my $childnode = $slot[$childslot]->{node};
835   my $childslotname = join (".",
836                             $slot[$childslot]->{node}->{name},
837                             $slot[$childslot]->{cpu});
838
839   my $childpid = fork();
840   if ($childpid == 0)
841   {
842     $SIG{'INT'} = 'DEFAULT';
843     $SIG{'QUIT'} = 'DEFAULT';
844     $SIG{'TERM'} = 'DEFAULT';
845
846     foreach (values (%reader))
847     {
848       close($_);
849     }
850     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
851     open(STDOUT,">&writer");
852     open(STDERR,">&writer");
853
854     undef $dbh;
855     undef $sth;
856
857     delete $ENV{"GNUPGHOME"};
858     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
859     $ENV{"TASK_QSEQUENCE"} = $id;
860     $ENV{"TASK_SEQUENCE"} = $level;
861     $ENV{"JOB_SCRIPT"} = $Job->{script};
862     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
863       $param =~ tr/a-z/A-Z/;
864       $ENV{"JOB_PARAMETER_$param"} = $value;
865     }
866     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
867     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
868     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
869     $ENV{"HOME"} = $ENV{"TASK_WORK"};
870     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
871     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
872     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
873     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
874
875     $ENV{"GZIP"} = "-n";
876
877     my @srunargs = (
878       "srun",
879       "--nodelist=".$childnode->{name},
880       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
881       "--job-name=$job_id.$id.$$",
882         );
883
884     my $stdbuf = " stdbuf --output=0 --error=0 ";
885
886     my $command =
887         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
888         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
889         ."&& cd $ENV{CRUNCH_TMP} "
890         # These environment variables get used explicitly later in
891         # $command.  No tool is expected to read these values directly.
892         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
893         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
894         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
895         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
896     $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
897     if ($docker_hash)
898     {
899       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
900       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
901       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
902       # We only set memory limits if Docker lets us limit both memory and swap.
903       # Memory limits alone have been supported longer, but subprocesses tend
904       # to get SIGKILL if they exceed that without any swap limit set.
905       # See #5642 for additional background.
906       if ($docker_limitmem) {
907         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
908       }
909
910       # The source tree and $destdir directory (which we have
911       # installed on the worker host) are available in the container,
912       # under the same path.
913       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
914       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
915
916       # Currently, we make arv-mount's mount point appear at /keep
917       # inside the container (instead of using the same path as the
918       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
919       # crunch scripts and utilities must not rely on this. They must
920       # use $TASK_KEEPMOUNT.
921       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
922       $ENV{TASK_KEEPMOUNT} = "/keep";
923
924       # TASK_WORK is almost exactly like a docker data volume: it
925       # starts out empty, is writable, and persists until no
926       # containers use it any more. We don't use --volumes-from to
927       # share it with other containers: it is only accessible to this
928       # task, and it goes away when this task stops.
929       #
930       # However, a docker data volume is writable only by root unless
931       # the mount point already happens to exist in the container with
932       # different permissions. Therefore, we [1] assume /tmp already
933       # exists in the image and is writable by the crunch user; [2]
934       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
935       # writable if they are created by docker while setting up the
936       # other --volumes); and [3] create $TASK_WORK inside the
937       # container using $build_script.
938       $command .= "--volume=/tmp ";
939       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
940       $ENV{"HOME"} = $ENV{"TASK_WORK"};
941       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
942
943       # TODO: Share a single JOB_WORK volume across all task
944       # containers on a given worker node, and delete it when the job
945       # ends (and, in case that doesn't work, when the next job
946       # starts).
947       #
948       # For now, use the same approach as TASK_WORK above.
949       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
950
951       while (my ($env_key, $env_val) = each %ENV)
952       {
953         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
954           $command .= "--env=\Q$env_key=$env_val\E ";
955         }
956       }
957       $command .= "--env=\QHOME=$ENV{HOME}\E ";
958       $command .= "\Q$docker_hash\E ";
959
960       if ($Job->{arvados_sdk_version}) {
961         $command .= $stdbuf;
962         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
963       } else {
964         $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
965             "if which stdbuf >/dev/null ; then " .
966             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
967             " else " .
968             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
969             " fi\'";
970       }
971     } else {
972       # Non-docker run
973       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
974       $command .= $stdbuf;
975       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
976     }
977
978     my @execargs = ('bash', '-c', $command);
979     srun (\@srunargs, \@execargs, undef, $build_script);
980     # exec() failed, we assume nothing happened.
981     die "srun() failed on build script\n";
982   }
983   close("writer");
984   if (!defined $childpid)
985   {
986     close $reader{$id};
987     delete $reader{$id};
988     next;
989   }
990   shift @freeslot;
991   $proc{$childpid} = { jobstep => $id,
992                        time => time,
993                        slot => $childslot,
994                        jobstepname => "$job_id.$id.$childpid",
995                      };
996   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
997   $slot[$childslot]->{pid} = $childpid;
998
999   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1000   Log ($id, "child $childpid started on $childslotname");
1001   $Jobstep->{starttime} = time;
1002   $Jobstep->{node} = $childnode->{name};
1003   $Jobstep->{slotindex} = $childslot;
1004   delete $Jobstep->{stderr};
1005   delete $Jobstep->{finishtime};
1006   delete $Jobstep->{tempfail};
1007
1008   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1009   $Jobstep->{'arvados_task'}->save;
1010
1011   splice @jobstep_todo, $todo_ptr, 1;
1012   --$todo_ptr;
1013
1014   $progress_is_dirty = 1;
1015
1016   while (!@freeslot
1017          ||
1018          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1019   {
1020     last THISROUND if $main::please_freeze || defined($main::success);
1021     if ($main::please_info)
1022     {
1023       $main::please_info = 0;
1024       freeze();
1025       create_output_collection();
1026       save_meta(1);
1027       update_progress_stats();
1028     }
1029     my $gotsome
1030         = readfrompipes ()
1031         + reapchildren ();
1032     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1033     {
1034       check_refresh_wanted();
1035       check_squeue();
1036       update_progress_stats();
1037       select (undef, undef, undef, 0.1);
1038     }
1039     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1040     {
1041       update_progress_stats();
1042     }
1043     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1044                                         $_->{node}->{hold_count} < 4 } @slot);
1045     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1046         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1047     {
1048       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1049           .($thisround_failed+$thisround_succeeded)
1050           .") -- giving up on this round";
1051       Log (undef, $message);
1052       last THISROUND;
1053     }
1054
1055     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1056     for (my $i=$#freeslot; $i>=0; $i--) {
1057       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1058         push @holdslot, (splice @freeslot, $i, 1);
1059       }
1060     }
1061     for (my $i=$#holdslot; $i>=0; $i--) {
1062       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1063         push @freeslot, (splice @holdslot, $i, 1);
1064       }
1065     }
1066
1067     # give up if no nodes are succeeding
1068     if ($working_slot_count < 1) {
1069       Log(undef, "Every node has failed -- giving up");
1070       last THISROUND;
1071     }
1072   }
1073 }
1074
1075
1076 push @freeslot, splice @holdslot;
1077 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1078
1079
1080 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1081 while (%proc)
1082 {
1083   if ($main::please_continue) {
1084     $main::please_continue = 0;
1085     goto THISROUND;
1086   }
1087   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1088   readfrompipes ();
1089   if (!reapchildren())
1090   {
1091     check_refresh_wanted();
1092     check_squeue();
1093     update_progress_stats();
1094     select (undef, undef, undef, 0.1);
1095     killem (keys %proc) if $main::please_freeze;
1096   }
1097 }
1098
1099 update_progress_stats();
1100 freeze_if_want_freeze();
1101
1102
1103 if (!defined $main::success)
1104 {
1105   if (!@jobstep_todo) {
1106     $main::success = 1;
1107   } elsif ($working_slot_count < 1) {
1108     save_output_collection();
1109     save_meta();
1110     exit(EX_RETRY_UNLOCKED);
1111   } elsif ($thisround_succeeded == 0 &&
1112            ($thisround_failed == 0 || $thisround_failed > 4)) {
1113     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1114     Log (undef, $message);
1115     $main::success = 0;
1116   }
1117 }
1118
1119 goto ONELEVEL if !defined $main::success;
1120
1121
1122 release_allocation();
1123 freeze();
1124 my $collated_output = save_output_collection();
1125 Log (undef, "finish");
1126
1127 save_meta();
1128
1129 my $final_state;
1130 if ($collated_output && $main::success) {
1131   $final_state = 'Complete';
1132 } else {
1133   $final_state = 'Failed';
1134 }
1135 $Job->update_attributes('state' => $final_state);
1136
1137 exit (($final_state eq 'Complete') ? 0 : 1);
1138
1139
1140
1141 sub update_progress_stats
1142 {
1143   $progress_stats_updated = time;
1144   return if !$progress_is_dirty;
1145   my ($todo, $done, $running) = (scalar @jobstep_todo,
1146                                  scalar @jobstep_done,
1147                                  scalar keys(%proc));
1148   $Job->{'tasks_summary'} ||= {};
1149   $Job->{'tasks_summary'}->{'todo'} = $todo;
1150   $Job->{'tasks_summary'}->{'done'} = $done;
1151   $Job->{'tasks_summary'}->{'running'} = $running;
1152   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1153   Log (undef, "status: $done done, $running running, $todo todo");
1154   $progress_is_dirty = 0;
1155 }
1156
1157
1158
1159 sub reapchildren
1160 {
1161   my $pid = waitpid (-1, WNOHANG);
1162   return 0 if $pid <= 0;
1163
1164   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1165                   . "."
1166                   . $slot[$proc{$pid}->{slot}]->{cpu});
1167   my $jobstepid = $proc{$pid}->{jobstep};
1168   my $elapsed = time - $proc{$pid}->{time};
1169   my $Jobstep = $jobstep[$jobstepid];
1170
1171   my $childstatus = $?;
1172   my $exitvalue = $childstatus >> 8;
1173   my $exitinfo = "exit ".exit_status_s($childstatus);
1174   $Jobstep->{'arvados_task'}->reload;
1175   my $task_success = $Jobstep->{'arvados_task'}->{success};
1176
1177   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1178
1179   if (!defined $task_success) {
1180     # task did not indicate one way or the other --> fail
1181     $Jobstep->{'arvados_task'}->{success} = 0;
1182     $Jobstep->{'arvados_task'}->save;
1183     $task_success = 0;
1184   }
1185
1186   if (!$task_success)
1187   {
1188     my $temporary_fail;
1189     $temporary_fail ||= $Jobstep->{tempfail};
1190     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1191
1192     ++$thisround_failed;
1193     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1194
1195     # Check for signs of a failed or misconfigured node
1196     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1197         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1198       # Don't count this against jobstep failure thresholds if this
1199       # node is already suspected faulty and srun exited quickly
1200       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1201           $elapsed < 5) {
1202         Log ($jobstepid, "blaming failure on suspect node " .
1203              $slot[$proc{$pid}->{slot}]->{node}->{name});
1204         $temporary_fail ||= 1;
1205       }
1206       ban_node_by_slot($proc{$pid}->{slot});
1207     }
1208
1209     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1210                              ++$Jobstep->{'failures'},
1211                              $temporary_fail ? 'temporary' : 'permanent',
1212                              $elapsed));
1213
1214     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1215       # Give up on this task, and the whole job
1216       $main::success = 0;
1217     }
1218     # Put this task back on the todo queue
1219     push @jobstep_todo, $jobstepid;
1220     $Job->{'tasks_summary'}->{'failed'}++;
1221   }
1222   else
1223   {
1224     ++$thisround_succeeded;
1225     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1226     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1227     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1228     push @jobstep_done, $jobstepid;
1229     Log ($jobstepid, "success in $elapsed seconds");
1230   }
1231   $Jobstep->{exitcode} = $childstatus;
1232   $Jobstep->{finishtime} = time;
1233   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1234   $Jobstep->{'arvados_task'}->save;
1235   process_stderr ($jobstepid, $task_success);
1236   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1237                            length($Jobstep->{'arvados_task'}->{output}),
1238                            $Jobstep->{'arvados_task'}->{output}));
1239
1240   close $reader{$jobstepid};
1241   delete $reader{$jobstepid};
1242   delete $slot[$proc{$pid}->{slot}]->{pid};
1243   push @freeslot, $proc{$pid}->{slot};
1244   delete $proc{$pid};
1245
1246   if ($task_success) {
1247     # Load new tasks
1248     my $newtask_list = [];
1249     my $newtask_results;
1250     do {
1251       $newtask_results = api_call(
1252         "job_tasks/list",
1253         'where' => {
1254           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1255         },
1256         'order' => 'qsequence',
1257         'offset' => scalar(@$newtask_list),
1258       );
1259       push(@$newtask_list, @{$newtask_results->{items}});
1260     } while (@{$newtask_results->{items}});
1261     foreach my $arvados_task (@$newtask_list) {
1262       my $jobstep = {
1263         'level' => $arvados_task->{'sequence'},
1264         'failures' => 0,
1265         'arvados_task' => $arvados_task
1266       };
1267       push @jobstep, $jobstep;
1268       push @jobstep_todo, $#jobstep;
1269     }
1270   }
1271
1272   $progress_is_dirty = 1;
1273   1;
1274 }
1275
1276 sub check_refresh_wanted
1277 {
1278   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1279   if (@stat && $stat[9] > $latest_refresh) {
1280     $latest_refresh = scalar time;
1281     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1282     for my $attr ('cancelled_at',
1283                   'cancelled_by_user_uuid',
1284                   'cancelled_by_client_uuid',
1285                   'state') {
1286       $Job->{$attr} = $Job2->{$attr};
1287     }
1288     if ($Job->{'state'} ne "Running") {
1289       if ($Job->{'state'} eq "Cancelled") {
1290         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1291       } else {
1292         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1293       }
1294       $main::success = 0;
1295       $main::please_freeze = 1;
1296     }
1297   }
1298 }
1299
1300 sub check_squeue
1301 {
1302   my $last_squeue_check = $squeue_checked;
1303
1304   # Do not call `squeue` or check the kill list more than once every
1305   # 15 seconds.
1306   return if $last_squeue_check > time - 15;
1307   $squeue_checked = time;
1308
1309   # Look for children from which we haven't received stderr data since
1310   # the last squeue check. If no such children exist, all procs are
1311   # alive and there's no need to even look at squeue.
1312   #
1313   # As long as the crunchstat poll interval (10s) is shorter than the
1314   # squeue check interval (15s) this should make the squeue check an
1315   # infrequent event.
1316   my $silent_procs = 0;
1317   for my $jobstep (values %proc)
1318   {
1319     if ($jobstep->{stderr_at} < $last_squeue_check)
1320     {
1321       $silent_procs++;
1322     }
1323   }
1324   return if $silent_procs == 0;
1325
1326   # use killem() on procs whose killtime is reached
1327   while (my ($pid, $jobstep) = each %proc)
1328   {
1329     if (exists $jobstep->{killtime}
1330         && $jobstep->{killtime} <= time
1331         && $jobstep->{stderr_at} < $last_squeue_check)
1332     {
1333       my $sincewhen = "";
1334       if ($jobstep->{stderr_at}) {
1335         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1336       }
1337       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1338       killem ($pid);
1339     }
1340   }
1341
1342   if (!$have_slurm)
1343   {
1344     # here is an opportunity to check for mysterious problems with local procs
1345     return;
1346   }
1347
1348   # Get a list of steps still running.  Note: squeue(1) says --steps
1349   # selects a format (which we override anyway) and allows us to
1350   # specify which steps we're interested in (which we don't).
1351   # Importantly, it also changes the meaning of %j from "job name" to
1352   # "step name" and (although this isn't mentioned explicitly in the
1353   # docs) switches from "one line per job" mode to "one line per step"
1354   # mode. Without it, we'd just get a list of one job, instead of a
1355   # list of N steps.
1356   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1357   if ($? != 0)
1358   {
1359     Log(undef, "warning: squeue exit status $? ($!)");
1360     return;
1361   }
1362   chop @squeue;
1363
1364   # which of my jobsteps are running, according to squeue?
1365   my %ok;
1366   for my $jobstepname (@squeue)
1367   {
1368     $ok{$jobstepname} = 1;
1369   }
1370
1371   # Check for child procs >60s old and not mentioned by squeue.
1372   while (my ($pid, $jobstep) = each %proc)
1373   {
1374     if ($jobstep->{time} < time - 60
1375         && $jobstep->{jobstepname}
1376         && !exists $ok{$jobstep->{jobstepname}}
1377         && !exists $jobstep->{killtime})
1378     {
1379       # According to slurm, this task has ended (successfully or not)
1380       # -- but our srun child hasn't exited. First we must wait (30
1381       # seconds) in case this is just a race between communication
1382       # channels. Then, if our srun child process still hasn't
1383       # terminated, we'll conclude some slurm communication
1384       # error/delay has caused the task to die without notifying srun,
1385       # and we'll kill srun ourselves.
1386       $jobstep->{killtime} = time + 30;
1387       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1388     }
1389   }
1390 }
1391
1392
1393 sub release_allocation
1394 {
1395   if ($have_slurm)
1396   {
1397     Log (undef, "release job allocation");
1398     system "scancel $ENV{SLURM_JOB_ID}";
1399   }
1400 }
1401
1402
1403 sub readfrompipes
1404 {
1405   my $gotsome = 0;
1406   foreach my $job (keys %reader)
1407   {
1408     my $buf;
1409     while (0 < sysread ($reader{$job}, $buf, 8192))
1410     {
1411       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1412       $jobstep[$job]->{stderr_at} = time;
1413       $jobstep[$job]->{stderr} .= $buf;
1414       preprocess_stderr ($job);
1415       if (length ($jobstep[$job]->{stderr}) > 16384)
1416       {
1417         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1418       }
1419       $gotsome = 1;
1420     }
1421   }
1422   return $gotsome;
1423 }
1424
1425
1426 sub preprocess_stderr
1427 {
1428   my $job = shift;
1429
1430   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1431     my $line = $1;
1432     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1433     Log ($job, "stderr $line");
1434     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1435       # whoa.
1436       $main::please_freeze = 1;
1437     }
1438     elsif ($line =~ /srun: error: Node failure on/) {
1439       my $job_slot_index = $jobstep[$job]->{slotindex};
1440       $slot[$job_slot_index]->{node}->{fail_count}++;
1441       $jobstep[$job]->{tempfail} = 1;
1442       ban_node_by_slot($job_slot_index);
1443     }
1444     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1445       $jobstep[$job]->{tempfail} = 1;
1446       ban_node_by_slot($jobstep[$job]->{slotindex});
1447     }
1448     elsif ($line =~ /arvados\.errors\.Keep/) {
1449       $jobstep[$job]->{tempfail} = 1;
1450     }
1451   }
1452 }
1453
1454
1455 sub process_stderr
1456 {
1457   my $job = shift;
1458   my $task_success = shift;
1459   preprocess_stderr ($job);
1460
1461   map {
1462     Log ($job, "stderr $_");
1463   } split ("\n", $jobstep[$job]->{stderr});
1464 }
1465
1466 sub fetch_block
1467 {
1468   my $hash = shift;
1469   my $keep;
1470   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1471     Log(undef, "fetch_block run error from arv-get $hash: $!");
1472     return undef;
1473   }
1474   my $output_block = "";
1475   while (1) {
1476     my $buf;
1477     my $bytes = sysread($keep, $buf, 1024 * 1024);
1478     if (!defined $bytes) {
1479       Log(undef, "fetch_block read error from arv-get: $!");
1480       $output_block = undef;
1481       last;
1482     } elsif ($bytes == 0) {
1483       # sysread returns 0 at the end of the pipe.
1484       last;
1485     } else {
1486       # some bytes were read into buf.
1487       $output_block .= $buf;
1488     }
1489   }
1490   close $keep;
1491   if ($?) {
1492     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1493     $output_block = undef;
1494   }
1495   return $output_block;
1496 }
1497
1498 # Create a collection by concatenating the output of all tasks (each
1499 # task's output is either a manifest fragment, a locator for a
1500 # manifest fragment stored in Keep, or nothing at all). Return the
1501 # portable_data_hash of the new collection.
1502 sub create_output_collection
1503 {
1504   Log (undef, "collate");
1505
1506   my ($child_out, $child_in);
1507   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1508 import arvados
1509 import sys
1510 print (arvados.api("v1").collections().
1511        create(body={"manifest_text": sys.stdin.read()}).
1512        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1513 }, retry_count());
1514
1515   my $task_idx = -1;
1516   my $manifest_size = 0;
1517   for (@jobstep)
1518   {
1519     ++$task_idx;
1520     my $output = $_->{'arvados_task'}->{output};
1521     next if (!defined($output));
1522     my $next_write;
1523     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1524       $next_write = fetch_block($output);
1525     } else {
1526       $next_write = $output;
1527     }
1528     if (defined($next_write)) {
1529       if (!defined(syswrite($child_in, $next_write))) {
1530         # There's been an error writing.  Stop the loop.
1531         # We'll log details about the exit code later.
1532         last;
1533       } else {
1534         $manifest_size += length($next_write);
1535       }
1536     } else {
1537       my $uuid = $_->{'arvados_task'}->{'uuid'};
1538       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1539       $main::success = 0;
1540     }
1541   }
1542   close($child_in);
1543   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1544
1545   my $joboutput;
1546   my $s = IO::Select->new($child_out);
1547   if ($s->can_read(120)) {
1548     sysread($child_out, $joboutput, 1024 * 1024);
1549     waitpid($pid, 0);
1550     if ($?) {
1551       Log(undef, "output collection creation exited " . exit_status_s($?));
1552       $joboutput = undef;
1553     } else {
1554       chomp($joboutput);
1555     }
1556   } else {
1557     Log (undef, "timed out while creating output collection");
1558     foreach my $signal (2, 2, 2, 15, 15, 9) {
1559       kill($signal, $pid);
1560       last if waitpid($pid, WNOHANG) == -1;
1561       sleep(1);
1562     }
1563   }
1564   close($child_out);
1565
1566   return $joboutput;
1567 }
1568
1569 # Calls create_output_collection, logs the result, and returns it.
1570 # If that was successful, save that as the output in the job record.
1571 sub save_output_collection {
1572   my $collated_output = create_output_collection();
1573
1574   if (!$collated_output) {
1575     Log(undef, "Failed to write output collection");
1576   }
1577   else {
1578     Log(undef, "job output $collated_output");
1579     $Job->update_attributes('output' => $collated_output);
1580   }
1581   return $collated_output;
1582 }
1583
1584 sub killem
1585 {
1586   foreach (@_)
1587   {
1588     my $sig = 2;                # SIGINT first
1589     if (exists $proc{$_}->{"sent_$sig"} &&
1590         time - $proc{$_}->{"sent_$sig"} > 4)
1591     {
1592       $sig = 15;                # SIGTERM if SIGINT doesn't work
1593     }
1594     if (exists $proc{$_}->{"sent_$sig"} &&
1595         time - $proc{$_}->{"sent_$sig"} > 4)
1596     {
1597       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1598     }
1599     if (!exists $proc{$_}->{"sent_$sig"})
1600     {
1601       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1602       kill $sig, $_;
1603       select (undef, undef, undef, 0.1);
1604       if ($sig == 2)
1605       {
1606         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1607       }
1608       $proc{$_}->{"sent_$sig"} = time;
1609       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1610     }
1611   }
1612 }
1613
1614
1615 sub fhbits
1616 {
1617   my($bits);
1618   for (@_) {
1619     vec($bits,fileno($_),1) = 1;
1620   }
1621   $bits;
1622 }
1623
1624
1625 # Send log output to Keep via arv-put.
1626 #
1627 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1628 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1629 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1630 # $log_pipe_pid is the pid of the arv-put subprocess.
1631 #
1632 # The only functions that should access these variables directly are:
1633 #
1634 # log_writer_start($logfilename)
1635 #     Starts an arv-put pipe, reading data on stdin and writing it to
1636 #     a $logfilename file in an output collection.
1637 #
1638 # log_writer_read_output([$timeout])
1639 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1640 #     Passes $timeout to the select() call, with a default of 0.01.
1641 #     Returns the result of the last read() call on $log_pipe_out, or
1642 #     -1 if read() wasn't called because select() timed out.
1643 #     Only other log_writer_* functions should need to call this.
1644 #
1645 # log_writer_send($txt)
1646 #     Writes $txt to the output log collection.
1647 #
1648 # log_writer_finish()
1649 #     Closes the arv-put pipe and returns the output that it produces.
1650 #
1651 # log_writer_is_active()
1652 #     Returns a true value if there is currently a live arv-put
1653 #     process, false otherwise.
1654 #
1655 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1656     $log_pipe_pid);
1657
1658 sub log_writer_start($)
1659 {
1660   my $logfilename = shift;
1661   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1662                         'arv-put',
1663                         '--stream',
1664                         '--retries', '3',
1665                         '--filename', $logfilename,
1666                         '-');
1667   $log_pipe_out_buf = "";
1668   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1669 }
1670
1671 sub log_writer_read_output {
1672   my $timeout = shift || 0.01;
1673   my $read = -1;
1674   while ($read && $log_pipe_out_select->can_read($timeout)) {
1675     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1676                  length($log_pipe_out_buf));
1677   }
1678   if (!defined($read)) {
1679     Log(undef, "error reading log manifest from arv-put: $!");
1680   }
1681   return $read;
1682 }
1683
1684 sub log_writer_send($)
1685 {
1686   my $txt = shift;
1687   print $log_pipe_in $txt;
1688   log_writer_read_output();
1689 }
1690
1691 sub log_writer_finish()
1692 {
1693   return unless $log_pipe_pid;
1694
1695   close($log_pipe_in);
1696
1697   my $read_result = log_writer_read_output(120);
1698   if ($read_result == -1) {
1699     Log (undef, "timed out reading from 'arv-put'");
1700   } elsif ($read_result != 0) {
1701     Log(undef, "failed to read arv-put log manifest to EOF");
1702   }
1703
1704   waitpid($log_pipe_pid, 0);
1705   if ($?) {
1706     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1707   }
1708
1709   close($log_pipe_out);
1710   my $arv_put_output = $log_pipe_out_buf;
1711   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1712       $log_pipe_out_select = undef;
1713
1714   return $arv_put_output;
1715 }
1716
1717 sub log_writer_is_active() {
1718   return $log_pipe_pid;
1719 }
1720
1721 sub Log                         # ($jobstep_id, $logmessage)
1722 {
1723   if ($_[1] =~ /\n/) {
1724     for my $line (split (/\n/, $_[1])) {
1725       Log ($_[0], $line);
1726     }
1727     return;
1728   }
1729   my $fh = select STDERR; $|=1; select $fh;
1730   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1731   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1732   $message .= "\n";
1733   my $datetime;
1734   if (log_writer_is_active() || -t STDERR) {
1735     my @gmtime = gmtime;
1736     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1737                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1738   }
1739   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1740
1741   if (log_writer_is_active()) {
1742     log_writer_send($datetime . " " . $message);
1743   }
1744 }
1745
1746
1747 sub croak
1748 {
1749   my ($package, $file, $line) = caller;
1750   my $message = "@_ at $file line $line\n";
1751   Log (undef, $message);
1752   freeze() if @jobstep_todo;
1753   create_output_collection() if @jobstep_todo;
1754   cleanup();
1755   save_meta();
1756   die;
1757 }
1758
1759
1760 sub cleanup
1761 {
1762   return unless $Job;
1763   if ($Job->{'state'} eq 'Cancelled') {
1764     $Job->update_attributes('finished_at' => scalar gmtime);
1765   } else {
1766     $Job->update_attributes('state' => 'Failed');
1767   }
1768 }
1769
1770
1771 sub save_meta
1772 {
1773   my $justcheckpoint = shift; # false if this will be the last meta saved
1774   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1775   return unless log_writer_is_active();
1776
1777   my $log_manifest = "";
1778   if ($Job->{log}) {
1779     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1780     $log_manifest .= $prev_log_coll->{manifest_text};
1781   }
1782   $log_manifest .= log_writer_finish();
1783
1784   my $log_coll = api_call(
1785     "collections/create", ensure_unique_name => 1, collection => {
1786       manifest_text => $log_manifest,
1787       owner_uuid => $Job->{owner_uuid},
1788       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1789     });
1790   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1791   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1792 }
1793
1794
1795 sub freeze_if_want_freeze
1796 {
1797   if ($main::please_freeze)
1798   {
1799     release_allocation();
1800     if (@_)
1801     {
1802       # kill some srun procs before freeze+stop
1803       map { $proc{$_} = {} } @_;
1804       while (%proc)
1805       {
1806         killem (keys %proc);
1807         select (undef, undef, undef, 0.1);
1808         my $died;
1809         while (($died = waitpid (-1, WNOHANG)) > 0)
1810         {
1811           delete $proc{$died};
1812         }
1813       }
1814     }
1815     freeze();
1816     create_output_collection();
1817     cleanup();
1818     save_meta();
1819     exit 1;
1820   }
1821 }
1822
1823
1824 sub freeze
1825 {
1826   Log (undef, "Freeze not implemented");
1827   return;
1828 }
1829
1830
1831 sub thaw
1832 {
1833   croak ("Thaw not implemented");
1834 }
1835
1836
1837 sub freezequote
1838 {
1839   my $s = shift;
1840   $s =~ s/\\/\\\\/g;
1841   $s =~ s/\n/\\n/g;
1842   return $s;
1843 }
1844
1845
1846 sub freezeunquote
1847 {
1848   my $s = shift;
1849   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1850   return $s;
1851 }
1852
1853
1854 sub srun
1855 {
1856   my $srunargs = shift;
1857   my $execargs = shift;
1858   my $opts = shift || {};
1859   my $stdin = shift;
1860   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1861
1862   $Data::Dumper::Terse = 1;
1863   $Data::Dumper::Indent = 0;
1864   my $show_cmd = Dumper($args);
1865   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1866   $show_cmd =~ s/\n/ /g;
1867   if ($opts->{fork}) {
1868     Log(undef, "starting: $show_cmd");
1869   } else {
1870     # This is a child process: parent is in charge of reading our
1871     # stderr and copying it to Log() if needed.
1872     warn "starting: $show_cmd\n";
1873   }
1874
1875   if (defined $stdin) {
1876     my $child = open STDIN, "-|";
1877     defined $child or die "no fork: $!";
1878     if ($child == 0) {
1879       print $stdin or die $!;
1880       close STDOUT or die $!;
1881       exit 0;
1882     }
1883   }
1884
1885   return system (@$args) if $opts->{fork};
1886
1887   exec @$args;
1888   warn "ENV size is ".length(join(" ",%ENV));
1889   die "exec failed: $!: @$args";
1890 }
1891
1892
1893 sub ban_node_by_slot {
1894   # Don't start any new jobsteps on this node for 60 seconds
1895   my $slotid = shift;
1896   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1897   $slot[$slotid]->{node}->{hold_count}++;
1898   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1899 }
1900
1901 sub must_lock_now
1902 {
1903   my ($lockfile, $error_message) = @_;
1904   open L, ">", $lockfile or croak("$lockfile: $!");
1905   if (!flock L, LOCK_EX|LOCK_NB) {
1906     croak("Can't lock $lockfile: $error_message\n");
1907   }
1908 }
1909
1910 sub find_docker_image {
1911   # Given a Keep locator, check to see if it contains a Docker image.
1912   # If so, return its stream name and Docker hash.
1913   # If not, return undef for both values.
1914   my $locator = shift;
1915   my ($streamname, $filename);
1916   my $image = api_call("collections/get", uuid => $locator);
1917   if ($image) {
1918     foreach my $line (split(/\n/, $image->{manifest_text})) {
1919       my @tokens = split(/\s+/, $line);
1920       next if (!@tokens);
1921       $streamname = shift(@tokens);
1922       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1923         if (defined($filename)) {
1924           return (undef, undef);  # More than one file in the Collection.
1925         } else {
1926           $filename = (split(/:/, $filedata, 3))[2];
1927         }
1928       }
1929     }
1930   }
1931   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1932     return ($streamname, $1);
1933   } else {
1934     return (undef, undef);
1935   }
1936 }
1937
1938 sub retry_count {
1939   # Calculate the number of times an operation should be retried,
1940   # assuming exponential backoff, and that we're willing to retry as
1941   # long as tasks have been running.  Enforce a minimum of 3 retries.
1942   my ($starttime, $endtime, $timediff, $retries);
1943   if (@jobstep) {
1944     $starttime = $jobstep[0]->{starttime};
1945     $endtime = $jobstep[-1]->{finishtime};
1946   }
1947   if (!defined($starttime)) {
1948     $timediff = 0;
1949   } elsif (!defined($endtime)) {
1950     $timediff = time - $starttime;
1951   } else {
1952     $timediff = ($endtime - $starttime) - (time - $endtime);
1953   }
1954   if ($timediff > 0) {
1955     $retries = int(log($timediff) / log(2));
1956   } else {
1957     $retries = 1;  # Use the minimum.
1958   }
1959   return ($retries > 3) ? $retries : 3;
1960 }
1961
1962 sub retry_op {
1963   # Pass in two function references.
1964   # This method will be called with the remaining arguments.
1965   # If it dies, retry it with exponential backoff until it succeeds,
1966   # or until the current retry_count is exhausted.  After each failure
1967   # that can be retried, the second function will be called with
1968   # the current try count (0-based), next try time, and error message.
1969   my $operation = shift;
1970   my $retry_callback = shift;
1971   my $retries = retry_count();
1972   foreach my $try_count (0..$retries) {
1973     my $next_try = time + (2 ** $try_count);
1974     my $result = eval { $operation->(@_); };
1975     if (!$@) {
1976       return $result;
1977     } elsif ($try_count < $retries) {
1978       $retry_callback->($try_count, $next_try, $@);
1979       my $sleep_time = $next_try - time;
1980       sleep($sleep_time) if ($sleep_time > 0);
1981     }
1982   }
1983   # Ensure the error message ends in a newline, so Perl doesn't add
1984   # retry_op's line number to it.
1985   chomp($@);
1986   die($@ . "\n");
1987 }
1988
1989 sub api_call {
1990   # Pass in a /-separated API method name, and arguments for it.
1991   # This function will call that method, retrying as needed until
1992   # the current retry_count is exhausted, with a log on the first failure.
1993   my $method_name = shift;
1994   my $log_api_retry = sub {
1995     my ($try_count, $next_try_at, $errmsg) = @_;
1996     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1997     $errmsg =~ s/\s/ /g;
1998     $errmsg =~ s/\s+$//;
1999     my $retry_msg;
2000     if ($next_try_at < time) {
2001       $retry_msg = "Retrying.";
2002     } else {
2003       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2004       $retry_msg = "Retrying at $next_try_fmt.";
2005     }
2006     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2007   };
2008   my $method = $arv;
2009   foreach my $key (split(/\//, $method_name)) {
2010     $method = $method->{$key};
2011   }
2012   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2013 }
2014
2015 sub exit_status_s {
2016   # Given a $?, return a human-readable exit code string like "0" or
2017   # "1" or "0 with signal 1" or "1 with signal 11".
2018   my $exitcode = shift;
2019   my $s = $exitcode >> 8;
2020   if ($exitcode & 0x7f) {
2021     $s .= " with signal " . ($exitcode & 0x7f);
2022   }
2023   if ($exitcode & 0x80) {
2024     $s .= " with core dump";
2025   }
2026   return $s;
2027 }
2028
2029 sub handle_readall {
2030   # Pass in a glob reference to a file handle.
2031   # Read all its contents and return them as a string.
2032   my $fh_glob_ref = shift;
2033   local $/ = undef;
2034   return <$fh_glob_ref>;
2035 }
2036
2037 sub tar_filename_n {
2038   my $n = shift;
2039   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2040 }
2041
2042 sub add_git_archive {
2043   # Pass in a git archive command as a string or list, a la system().
2044   # This method will save its output to be included in the archive sent to the
2045   # build script.
2046   my $git_input;
2047   $git_tar_count++;
2048   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2049     croak("Failed to save git archive: $!");
2050   }
2051   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2052   close($git_input);
2053   waitpid($git_pid, 0);
2054   close(GIT_ARCHIVE);
2055   if ($?) {
2056     croak("Failed to save git archive: git exited " . exit_status_s($?));
2057   }
2058 }
2059
2060 sub combined_git_archive {
2061   # Combine all saved tar archives into a single archive, then return its
2062   # contents in a string.  Return undef if no archives have been saved.
2063   if ($git_tar_count < 1) {
2064     return undef;
2065   }
2066   my $base_tar_name = tar_filename_n(1);
2067   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2068     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2069     if ($tar_exit != 0) {
2070       croak("Error preparing build archive: tar -A exited " .
2071             exit_status_s($tar_exit));
2072     }
2073   }
2074   if (!open(GIT_TAR, "<", $base_tar_name)) {
2075     croak("Could not open build archive: $!");
2076   }
2077   my $tar_contents = handle_readall(\*GIT_TAR);
2078   close(GIT_TAR);
2079   return $tar_contents;
2080 }
2081
2082 sub set_nonblocking {
2083   my $fh = shift;
2084   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2085   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2086 }
2087
2088 __DATA__
2089 #!/usr/bin/env perl
2090 #
2091 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2092 # server invokes this script on individual compute nodes, or localhost if we're
2093 # running a job locally.  It gets called in two modes:
2094 #
2095 # * No arguments: Installation mode.  Read a tar archive from the DATA
2096 #   file handle; it includes the Crunch script's source code, and
2097 #   maybe SDKs as well.  Those should be installed in the proper
2098 #   locations.  This runs outside of any Docker container, so don't try to
2099 #   introspect Crunch's runtime environment.
2100 #
2101 # * With arguments: Crunch script run mode.  This script should set up the
2102 #   environment, then run the command specified in the arguments.  This runs
2103 #   inside any Docker container.
2104
2105 use Fcntl ':flock';
2106 use File::Path qw( make_path remove_tree );
2107 use POSIX qw(getcwd);
2108
2109 use constant TASK_TEMPFAIL => 111;
2110
2111 # Map SDK subdirectories to the path environments they belong to.
2112 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2113
2114 my $destdir = $ENV{"CRUNCH_SRC"};
2115 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2116 my $repo = $ENV{"CRUNCH_SRC_URL"};
2117 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2118 my $job_work = $ENV{"JOB_WORK"};
2119 my $task_work = $ENV{"TASK_WORK"};
2120
2121 open(STDOUT_ORIG, ">&", STDOUT);
2122 open(STDERR_ORIG, ">&", STDERR);
2123
2124 for my $dir ($destdir, $job_work, $task_work) {
2125   if ($dir) {
2126     make_path $dir;
2127     -e $dir or die "Failed to create temporary directory ($dir): $!";
2128   }
2129 }
2130
2131 if ($task_work) {
2132   remove_tree($task_work, {keep_root => 1});
2133 }
2134
2135 ### Crunch script run mode
2136 if (@ARGV) {
2137   # We want to do routine logging during task 0 only.  This gives the user
2138   # the information they need, but avoids repeating the information for every
2139   # task.
2140   my $Log;
2141   if ($ENV{TASK_SEQUENCE} eq "0") {
2142     $Log = sub {
2143       my $msg = shift;
2144       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2145     };
2146   } else {
2147     $Log = sub { };
2148   }
2149
2150   my $python_src = "$install_dir/python";
2151   my $venv_dir = "$job_work/.arvados.venv";
2152   my $venv_built = -e "$venv_dir/bin/activate";
2153   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2154     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2155                  "--python=python2.7", $venv_dir);
2156     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2157     $venv_built = 1;
2158     $Log->("Built Python SDK virtualenv");
2159   }
2160
2161   my $pip_bin = "pip";
2162   if ($venv_built) {
2163     $Log->("Running in Python SDK virtualenv");
2164     $pip_bin = "$venv_dir/bin/pip";
2165     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2166     @ARGV = ("/bin/sh", "-ec",
2167              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2168   } elsif (-d $python_src) {
2169     $Log->("Warning: virtualenv not found inside Docker container default " .
2170            "\$PATH. Can't install Python SDK.");
2171   }
2172
2173   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2174   if ($pkgs) {
2175     $Log->("Using Arvados SDK:");
2176     foreach my $line (split /\n/, $pkgs) {
2177       $Log->($line);
2178     }
2179   } else {
2180     $Log->("Arvados SDK packages not found");
2181   }
2182
2183   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2184     my $sdk_path = "$install_dir/$sdk_dir";
2185     if (-d $sdk_path) {
2186       if ($ENV{$sdk_envkey}) {
2187         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2188       } else {
2189         $ENV{$sdk_envkey} = $sdk_path;
2190       }
2191       $Log->("Arvados SDK added to %s", $sdk_envkey);
2192     }
2193   }
2194
2195   exec(@ARGV);
2196   die "Cannot exec `@ARGV`: $!";
2197 }
2198
2199 ### Installation mode
2200 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2201 flock L, LOCK_EX;
2202 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2203   # This exact git archive (source + arvados sdk) is already installed
2204   # here, so there's no need to reinstall it.
2205
2206   # We must consume our DATA section, though: otherwise the process
2207   # feeding it to us will get SIGPIPE.
2208   my $buf;
2209   while (read(DATA, $buf, 65536)) { }
2210
2211   exit(0);
2212 }
2213
2214 unlink "$destdir.archive_hash";
2215 mkdir $destdir;
2216
2217 do {
2218   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2219   local $SIG{PIPE} = "IGNORE";
2220   warn "Extracting archive: $archive_hash\n";
2221   # --ignore-zeros is necessary sometimes: depending on how much NUL
2222   # padding tar -A put on our combined archive (which in turn depends
2223   # on the length of the component archives) tar without
2224   # --ignore-zeros will exit before consuming stdin and cause close()
2225   # to fail on the resulting SIGPIPE.
2226   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2227     die "Error launching 'tar -xC $destdir': $!";
2228   }
2229   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2230   # get SIGPIPE.  We must feed it data incrementally.
2231   my $tar_input;
2232   while (read(DATA, $tar_input, 65536)) {
2233     print TARX $tar_input;
2234   }
2235   if(!close(TARX)) {
2236     die "'tar -xC $destdir' exited $?: $!";
2237   }
2238 };
2239
2240 mkdir $install_dir;
2241
2242 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2243 if (-d $sdk_root) {
2244   foreach my $sdk_lang (("python",
2245                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2246     if (-d "$sdk_root/$sdk_lang") {
2247       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2248         die "Failed to install $sdk_lang SDK: $!";
2249       }
2250     }
2251   }
2252 }
2253
2254 my $python_dir = "$install_dir/python";
2255 if ((-d $python_dir) and can_run("python2.7")) {
2256   open(my $egg_info_pipe, "-|",
2257        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2258   my @egg_info_errors = <$egg_info_pipe>;
2259   close($egg_info_pipe);
2260
2261   if ($?) {
2262     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2263       # egg_info apparently failed because it couldn't ask git for a build tag.
2264       # Specify no build tag.
2265       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2266       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2267       close($pysdk_cfg);
2268     } else {
2269       my $egg_info_exit = $? >> 8;
2270       foreach my $errline (@egg_info_errors) {
2271         warn $errline;
2272       }
2273       warn "python setup.py egg_info failed: exit $egg_info_exit";
2274       exit ($egg_info_exit || 1);
2275     }
2276   }
2277 }
2278
2279 # Hide messages from the install script (unless it fails: shell_or_die
2280 # will show $destdir.log in that case).
2281 open(STDOUT, ">>", "$destdir.log");
2282 open(STDERR, ">&", STDOUT);
2283
2284 if (-e "$destdir/crunch_scripts/install") {
2285     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2286 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2287     # Old version
2288     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2289 } elsif (-e "./install.sh") {
2290     shell_or_die (undef, "./install.sh", $install_dir);
2291 }
2292
2293 if ($archive_hash) {
2294     unlink "$destdir.archive_hash.new";
2295     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2296     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2297 }
2298
2299 close L;
2300
2301 sub can_run {
2302   my $command_name = shift;
2303   open(my $which, "-|", "which", $command_name);
2304   while (<$which>) { }
2305   close($which);
2306   return ($? == 0);
2307 }
2308
2309 sub shell_or_die
2310 {
2311   my $exitcode = shift;
2312
2313   if ($ENV{"DEBUG"}) {
2314     print STDERR "@_\n";
2315   }
2316   if (system (@_) != 0) {
2317     my $err = $!;
2318     my $code = $?;
2319     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2320     open STDERR, ">&STDERR_ORIG";
2321     system ("cat $destdir.log >&2");
2322     warn "@_ failed ($err): $exitstatus";
2323     if (defined($exitcode)) {
2324       exit $exitcode;
2325     }
2326     else {
2327       exit (($code >> 8) || 1);
2328     }
2329   }
2330 }
2331
2332 __DATA__