Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $local_job = JSON::decode_json($jobspec);
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if (($Job || $local_job)->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $local_job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $local_job->{'started_at'} = gmtime;
216   $local_job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $local_job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  # The number of consecutive times a task has been dispatched
297                  # to this node and failed.
298                  losing_streak => 0,
299                  # The number of consecutive times that SLURM has reported
300                  # a node failure since the last successful task.
301                  fail_count => 0,
302                  # Don't dispatch work to this node until this time
303                  # (in seconds since the epoch) has passed.
304                  hold_until => 0 };
305     foreach my $cpu (1..$ncpus)
306     {
307       push @slot, { node => $node,
308                     cpu => $cpu };
309     }
310   }
311   push @node, @nodelist;
312 }
313
314
315
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
318
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
320
321
322 $Job->update_attributes(
323   'tasks_summary' => { 'failed' => 0,
324                        'todo' => 1,
325                        'running' => 0,
326                        'done' => 0 });
327
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
336
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
342
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
346
347
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
354
355
356
357 if (defined $Job->{thawedfromkey})
358 {
359   thaw ($Job->{thawedfromkey});
360 }
361 else
362 {
363   my $first_task = api_call("job_tasks/create", job_task => {
364     'job_uuid' => $Job->{'uuid'},
365     'sequence' => 0,
366     'qsequence' => 0,
367     'parameters' => {},
368   });
369   push @jobstep, { 'level' => 0,
370                    'failures' => 0,
371                    'arvados_task' => $first_task,
372                  };
373   push @jobstep_todo, 0;
374 }
375
376
377 if (!$have_slurm)
378 {
379   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
380 }
381
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
385
386 if (!defined $no_clear_tmp) {
387   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388   Log (undef, "Clean work dirs");
389
390   my $cleanpid = fork();
391   if ($cleanpid == 0)
392   {
393     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
394     # Then clean up work directories.
395     # TODO: When #5036 is done and widely deployed, we can limit mount's
396     # -t option to simply fuse.keep.
397     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
399     exit (1);
400   }
401   while (1)
402   {
403     last if $cleanpid == waitpid (-1, WNOHANG);
404     freeze_if_want_freeze ($cleanpid);
405     select (undef, undef, undef, 0.1);
406   }
407   if ($?) {
408     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
409     exit(EX_RETRY_UNLOCKED);
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   $docker_stream =~ s/^\.//;
422   my $docker_install_script = qq{
423 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
424     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
425 fi
426 };
427   my $docker_pid = fork();
428   if ($docker_pid == 0)
429   {
430     srun (["srun", "--nodelist=" . join(',', @node)],
431           ["/bin/sh", "-ec", $docker_install_script]);
432     exit ($?);
433   }
434   while (1)
435   {
436     last if $docker_pid == waitpid (-1, WNOHANG);
437     freeze_if_want_freeze ($docker_pid);
438     select (undef, undef, undef, 0.1);
439   }
440   if ($? != 0)
441   {
442     croak("Installing Docker image from $docker_locator exited "
443           .exit_status_s($?));
444   }
445
446   # Determine whether this version of Docker supports memory+swap limits.
447   srun(["srun", "--nodelist=" . $node[0]],
448        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
449       {fork => 1});
450   $docker_limitmem = ($? == 0);
451
452   # Find a non-root Docker user to use.
453   # Tries the default user for the container, then 'crunch', then 'nobody',
454   # testing for whether the actual user id is non-zero.  This defends against
455   # mistakes but not malice, but we intend to harden the security in the future
456   # so we don't want anyone getting used to their jobs running as root in their
457   # Docker containers.
458   my @tryusers = ("", "crunch", "nobody");
459   foreach my $try_user (@tryusers) {
460     my $try_user_arg;
461     if ($try_user eq "") {
462       Log(undef, "Checking if container default user is not UID 0");
463       $try_user_arg = "";
464     } else {
465       Log(undef, "Checking if user '$try_user' is not UID 0");
466       $try_user_arg = "--user=$try_user";
467     }
468     srun(["srun", "--nodelist=" . $node[0]],
469          ["/bin/sh", "-ec",
470           "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
471           " test \$a -ne 0"],
472          {fork => 1});
473     if ($? == 0) {
474       $dockeruserarg = $try_user_arg;
475       if ($try_user eq "") {
476         Log(undef, "Container will run with default user");
477       } else {
478         Log(undef, "Container will run with $dockeruserarg");
479       }
480       last;
481     }
482   }
483
484   if (!defined $dockeruserarg) {
485     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
486   }
487
488   if ($Job->{arvados_sdk_version}) {
489     # The job also specifies an Arvados SDK version.  Add the SDKs to the
490     # tar file for the build script to install.
491     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
492                        $Job->{arvados_sdk_version}));
493     add_git_archive("git", "--git-dir=$git_dir", "archive",
494                     "--prefix=.arvados.sdk/",
495                     $Job->{arvados_sdk_version}, "sdk");
496   }
497 }
498
499 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
500   # If script_version looks like an absolute path, *and* the --git-dir
501   # argument was not given -- which implies we were not invoked by
502   # crunch-dispatch -- we will use the given path as a working
503   # directory instead of resolving script_version to a git commit (or
504   # doing anything else with git).
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
506   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
507 }
508 else {
509   # Resolve the given script_version to a git commit sha1. Also, if
510   # the repository is remote, clone it into our local filesystem: this
511   # ensures "git archive" will work, and is necessary to reliably
512   # resolve a symbolic script_version like "master^".
513   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
514
515   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
516
517   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
518
519   # If we're running under crunch-dispatch, it will have already
520   # pulled the appropriate source tree into its own repository, and
521   # given us that repo's path as $git_dir.
522   #
523   # If we're running a "local" job, we might have to fetch content
524   # from a remote repository.
525   #
526   # (Currently crunch-dispatch gives a local path with --git-dir, but
527   # we might as well accept URLs there too in case it changes its
528   # mind.)
529   my $repo = $git_dir || $Job->{'repository'};
530
531   # Repository can be remote or local. If remote, we'll need to fetch it
532   # to a local dir before doing `git log` et al.
533   my $repo_location;
534
535   if ($repo =~ m{://|^[^/]*:}) {
536     # $repo is a git url we can clone, like git:// or https:// or
537     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
538     # not recognized here because distinguishing that from a local
539     # path is too fragile. If you really need something strange here,
540     # use the ssh:// form.
541     $repo_location = 'remote';
542   } elsif ($repo =~ m{^\.*/}) {
543     # $repo is a local path to a git index. We'll also resolve ../foo
544     # to ../foo/.git if the latter is a directory. To help
545     # disambiguate local paths from named hosted repositories, this
546     # form must be given as ./ or ../ if it's a relative path.
547     if (-d "$repo/.git") {
548       $repo = "$repo/.git";
549     }
550     $repo_location = 'local';
551   } else {
552     # $repo is none of the above. It must be the name of a hosted
553     # repository.
554     my $arv_repo_list = api_call("repositories/list",
555                                  'filters' => [['name','=',$repo]]);
556     my @repos_found = @{$arv_repo_list->{'items'}};
557     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
558     if ($n_found > 0) {
559       Log(undef, "Repository '$repo' -> "
560           . join(", ", map { $_->{'uuid'} } @repos_found));
561     }
562     if ($n_found != 1) {
563       croak("Error: Found $n_found repositories with name '$repo'.");
564     }
565     $repo = $repos_found[0]->{'fetch_url'};
566     $repo_location = 'remote';
567   }
568   Log(undef, "Using $repo_location repository '$repo'");
569   $ENV{"CRUNCH_SRC_URL"} = $repo;
570
571   # Resolve given script_version (we'll call that $treeish here) to a
572   # commit sha1 ($commit).
573   my $treeish = $Job->{'script_version'};
574   my $commit;
575   if ($repo_location eq 'remote') {
576     # We minimize excess object-fetching by re-using the same bare
577     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
578     # just keep adding remotes to it as needed.
579     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
580     my $gitcmd = "git --git-dir=\Q$local_repo\E";
581
582     # Set up our local repo for caching remote objects, making
583     # archives, etc.
584     if (!-d $local_repo) {
585       make_path($local_repo) or croak("Error: could not create $local_repo");
586     }
587     # This works (exits 0 and doesn't delete fetched objects) even
588     # if $local_repo is already initialized:
589     `$gitcmd init --bare`;
590     if ($?) {
591       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
592     }
593
594     # If $treeish looks like a hash (or abbrev hash) we look it up in
595     # our local cache first, since that's cheaper. (We don't want to
596     # do that with tags/branches though -- those change over time, so
597     # they should always be resolved by the remote repo.)
598     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
599       # Hide stderr because it's normal for this to fail:
600       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
601       if ($? == 0 &&
602           # Careful not to resolve a branch named abcdeff to commit 1234567:
603           $sha1 =~ /^$treeish/ &&
604           $sha1 =~ /^([0-9a-f]{40})$/s) {
605         $commit = $1;
606         Log(undef, "Commit $commit already present in $local_repo");
607       }
608     }
609
610     if (!defined $commit) {
611       # If $treeish isn't just a hash or abbrev hash, or isn't here
612       # yet, we need to fetch the remote to resolve it correctly.
613
614       # First, remove all local heads. This prevents a name that does
615       # not exist on the remote from resolving to (or colliding with)
616       # a previously fetched branch or tag (possibly from a different
617       # remote).
618       remove_tree("$local_repo/refs/heads", {keep_root => 1});
619
620       Log(undef, "Fetching objects from $repo to $local_repo");
621       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
622       if ($?) {
623         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
624       }
625     }
626
627     # Now that the data is all here, we will use our local repo for
628     # the rest of our git activities.
629     $repo = $local_repo;
630   }
631
632   my $gitcmd = "git --git-dir=\Q$repo\E";
633   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
634   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
635     croak("`$gitcmd rev-list` exited "
636           .exit_status_s($?)
637           .", '$treeish' not found, giving up");
638   }
639   $commit = $1;
640   Log(undef, "Version $treeish is commit $commit");
641
642   if ($commit ne $Job->{'script_version'}) {
643     # Record the real commit id in the database, frozentokey, logs,
644     # etc. -- instead of an abbreviation or a branch name which can
645     # become ambiguous or point to a different commit in the future.
646     if (!$Job->update_attributes('script_version' => $commit)) {
647       croak("Error: failed to update job's script_version attribute");
648     }
649   }
650
651   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
652   add_git_archive("$gitcmd archive ''\Q$commit\E");
653 }
654
655 my $git_archive = combined_git_archive();
656 if (!defined $git_archive) {
657   Log(undef, "Skip install phase (no git archive)");
658   if ($have_slurm) {
659     Log(undef, "Warning: This probably means workers have no source tree!");
660   }
661 }
662 else {
663   my $install_exited;
664   my $install_script_tries_left = 3;
665   for (my $attempts = 0; $attempts < 3; $attempts++) {
666     Log(undef, "Run install script on all workers");
667
668     my @srunargs = ("srun",
669                     "--nodelist=$nodelist",
670                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
671     my @execargs = ("sh", "-c",
672                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
673
674     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
675     my ($install_stderr_r, $install_stderr_w);
676     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
677     set_nonblocking($install_stderr_r);
678     my $installpid = fork();
679     if ($installpid == 0)
680     {
681       close($install_stderr_r);
682       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
683       open(STDOUT, ">&", $install_stderr_w);
684       open(STDERR, ">&", $install_stderr_w);
685       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
686       exit (1);
687     }
688     close($install_stderr_w);
689     # Tell freeze_if_want_freeze how to kill the child, otherwise the
690     # "waitpid(installpid)" loop won't get interrupted by a freeze:
691     $proc{$installpid} = {};
692     my $stderr_buf = '';
693     # Track whether anything appears on stderr other than slurm errors
694     # ("srun: ...") and the "starting: ..." message printed by the
695     # srun subroutine itself:
696     my $stderr_anything_from_script = 0;
697     my $match_our_own_errors = '^(srun: error: |starting: \[)';
698     while ($installpid != waitpid(-1, WNOHANG)) {
699       freeze_if_want_freeze ($installpid);
700       # Wait up to 0.1 seconds for something to appear on stderr, then
701       # do a non-blocking read.
702       my $bits = fhbits($install_stderr_r);
703       select ($bits, undef, $bits, 0.1);
704       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
705       {
706         while ($stderr_buf =~ /^(.*?)\n/) {
707           my $line = $1;
708           substr $stderr_buf, 0, 1+length($line), "";
709           Log(undef, "stderr $line");
710           if ($line !~ /$match_our_own_errors/) {
711             $stderr_anything_from_script = 1;
712           }
713         }
714       }
715     }
716     delete $proc{$installpid};
717     $install_exited = $?;
718     close($install_stderr_r);
719     if (length($stderr_buf) > 0) {
720       if ($stderr_buf !~ /$match_our_own_errors/) {
721         $stderr_anything_from_script = 1;
722       }
723       Log(undef, "stderr $stderr_buf")
724     }
725
726     Log (undef, "Install script exited ".exit_status_s($install_exited));
727     last if $install_exited == 0 || $main::please_freeze;
728     # If the install script fails but doesn't print an error message,
729     # the next thing anyone is likely to do is just run it again in
730     # case it was a transient problem like "slurm communication fails
731     # because the network isn't reliable enough". So we'll just do
732     # that ourselves (up to 3 attempts in total). OTOH, if there is an
733     # error message, the problem is more likely to have a real fix and
734     # we should fail the job so the fixing process can start, instead
735     # of doing 2 more attempts.
736     last if $stderr_anything_from_script;
737   }
738
739   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
740     unlink($tar_filename);
741   }
742
743   if ($install_exited != 0) {
744     croak("Giving up");
745   }
746 }
747
748 foreach (qw (script script_version script_parameters runtime_constraints))
749 {
750   Log (undef,
751        "$_ " .
752        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
753 }
754 foreach (split (/\n/, $Job->{knobs}))
755 {
756   Log (undef, "knob " . $_);
757 }
758
759
760
761 $main::success = undef;
762
763
764
765 ONELEVEL:
766
767 my $thisround_succeeded = 0;
768 my $thisround_failed = 0;
769 my $thisround_failed_multiple = 0;
770 my $working_slot_count = scalar(@slot);
771
772 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
773                        or $a <=> $b } @jobstep_todo;
774 my $level = $jobstep[$jobstep_todo[0]]->{level};
775
776 my $initial_tasks_this_level = 0;
777 foreach my $id (@jobstep_todo) {
778   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
779 }
780
781 # If the number of tasks scheduled at this level #T is smaller than the number
782 # of slots available #S, only use the first #T slots, or the first slot on
783 # each node, whichever number is greater.
784 #
785 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
786 # based on these numbers.  Using fewer slots makes more resources available
787 # to each individual task, which should normally be a better strategy when
788 # there are fewer of them running with less parallelism.
789 #
790 # Note that this calculation is not redone if the initial tasks at
791 # this level queue more tasks at the same level.  This may harm
792 # overall task throughput for that level.
793 my @freeslot;
794 if ($initial_tasks_this_level < @node) {
795   @freeslot = (0..$#node);
796 } elsif ($initial_tasks_this_level < @slot) {
797   @freeslot = (0..$initial_tasks_this_level - 1);
798 } else {
799   @freeslot = (0..$#slot);
800 }
801 my $round_num_freeslots = scalar(@freeslot);
802
803 my %round_max_slots = ();
804 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
805   my $this_slot = $slot[$freeslot[$ii]];
806   my $node_name = $this_slot->{node}->{name};
807   $round_max_slots{$node_name} ||= $this_slot->{cpu};
808   last if (scalar(keys(%round_max_slots)) >= @node);
809 }
810
811 Log(undef, "start level $level with $round_num_freeslots slots");
812 my @holdslot;
813 my %reader;
814 my $progress_is_dirty = 1;
815 my $progress_stats_updated = 0;
816
817 update_progress_stats();
818
819
820 THISROUND:
821 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
822 {
823   # Don't create new tasks if we already know the job's final result.
824   last if defined($main::success);
825
826   my $id = $jobstep_todo[$todo_ptr];
827   my $Jobstep = $jobstep[$id];
828   if ($Jobstep->{level} != $level)
829   {
830     next;
831   }
832
833   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
834   set_nonblocking($reader{$id});
835
836   my $childslot = $freeslot[0];
837   my $childnode = $slot[$childslot]->{node};
838   my $childslotname = join (".",
839                             $slot[$childslot]->{node}->{name},
840                             $slot[$childslot]->{cpu});
841
842   my $childpid = fork();
843   if ($childpid == 0)
844   {
845     $SIG{'INT'} = 'DEFAULT';
846     $SIG{'QUIT'} = 'DEFAULT';
847     $SIG{'TERM'} = 'DEFAULT';
848
849     foreach (values (%reader))
850     {
851       close($_);
852     }
853     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
854     open(STDOUT,">&writer");
855     open(STDERR,">&writer");
856
857     undef $dbh;
858     undef $sth;
859
860     delete $ENV{"GNUPGHOME"};
861     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
862     $ENV{"TASK_QSEQUENCE"} = $id;
863     $ENV{"TASK_SEQUENCE"} = $level;
864     $ENV{"JOB_SCRIPT"} = $Job->{script};
865     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
866       $param =~ tr/a-z/A-Z/;
867       $ENV{"JOB_PARAMETER_$param"} = $value;
868     }
869     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
870     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
871     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
872     $ENV{"HOME"} = $ENV{"TASK_WORK"};
873     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
874     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
875     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
876     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
877
878     $ENV{"GZIP"} = "-n";
879
880     my @srunargs = (
881       "srun",
882       "--nodelist=".$childnode->{name},
883       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
884       "--job-name=$job_id.$id.$$",
885         );
886
887     my $stdbuf = " stdbuf --output=0 --error=0 ";
888
889     my $command =
890         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
891         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
892         ."&& cd $ENV{CRUNCH_TMP} "
893         # These environment variables get used explicitly later in
894         # $command.  No tool is expected to read these values directly.
895         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
896         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
897         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
898         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
899     $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
900     if ($docker_hash)
901     {
902       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
903       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
904       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
905       $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
906       # We only set memory limits if Docker lets us limit both memory and swap.
907       # Memory limits alone have been supported longer, but subprocesses tend
908       # to get SIGKILL if they exceed that without any swap limit set.
909       # See #5642 for additional background.
910       if ($docker_limitmem) {
911         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
912       }
913
914       # The source tree and $destdir directory (which we have
915       # installed on the worker host) are available in the container,
916       # under the same path.
917       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
918       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
919
920       # Currently, we make arv-mount's mount point appear at /keep
921       # inside the container (instead of using the same path as the
922       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
923       # crunch scripts and utilities must not rely on this. They must
924       # use $TASK_KEEPMOUNT.
925       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
926       $ENV{TASK_KEEPMOUNT} = "/keep";
927
928       # TASK_WORK is almost exactly like a docker data volume: it
929       # starts out empty, is writable, and persists until no
930       # containers use it any more. We don't use --volumes-from to
931       # share it with other containers: it is only accessible to this
932       # task, and it goes away when this task stops.
933       #
934       # However, a docker data volume is writable only by root unless
935       # the mount point already happens to exist in the container with
936       # different permissions. Therefore, we [1] assume /tmp already
937       # exists in the image and is writable by the crunch user; [2]
938       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
939       # writable if they are created by docker while setting up the
940       # other --volumes); and [3] create $TASK_WORK inside the
941       # container using $build_script.
942       $command .= "--volume=/tmp ";
943       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
944       $ENV{"HOME"} = $ENV{"TASK_WORK"};
945       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
946
947       # TODO: Share a single JOB_WORK volume across all task
948       # containers on a given worker node, and delete it when the job
949       # ends (and, in case that doesn't work, when the next job
950       # starts).
951       #
952       # For now, use the same approach as TASK_WORK above.
953       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
954
955       while (my ($env_key, $env_val) = each %ENV)
956       {
957         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
958           $command .= "--env=\Q$env_key=$env_val\E ";
959         }
960       }
961       $command .= "--env=\QHOME=$ENV{HOME}\E ";
962       $command .= "\Q$docker_hash\E ";
963
964       if ($Job->{arvados_sdk_version}) {
965         $command .= $stdbuf;
966         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
967       } else {
968         $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
969             "if which stdbuf >/dev/null ; then " .
970             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
971             " else " .
972             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
973             " fi\'";
974       }
975     } else {
976       # Non-docker run
977       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
978       $command .= $stdbuf;
979       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
980     }
981
982     my @execargs = ('bash', '-c', $command);
983     srun (\@srunargs, \@execargs, undef, $build_script);
984     # exec() failed, we assume nothing happened.
985     die "srun() failed on build script\n";
986   }
987   close("writer");
988   if (!defined $childpid)
989   {
990     close $reader{$id};
991     delete $reader{$id};
992     next;
993   }
994   shift @freeslot;
995   $proc{$childpid} = { jobstep => $id,
996                        time => time,
997                        slot => $childslot,
998                        jobstepname => "$job_id.$id.$childpid",
999                      };
1000   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1001   $slot[$childslot]->{pid} = $childpid;
1002
1003   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1004   Log ($id, "child $childpid started on $childslotname");
1005   $Jobstep->{starttime} = time;
1006   $Jobstep->{node} = $childnode->{name};
1007   $Jobstep->{slotindex} = $childslot;
1008   delete $Jobstep->{stderr};
1009   delete $Jobstep->{finishtime};
1010   delete $Jobstep->{tempfail};
1011
1012   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1013   $Jobstep->{'arvados_task'}->save;
1014
1015   splice @jobstep_todo, $todo_ptr, 1;
1016   --$todo_ptr;
1017
1018   $progress_is_dirty = 1;
1019
1020   while (!@freeslot
1021          ||
1022          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1023   {
1024     last THISROUND if $main::please_freeze;
1025     if ($main::please_info)
1026     {
1027       $main::please_info = 0;
1028       freeze();
1029       create_output_collection();
1030       save_meta(1);
1031       update_progress_stats();
1032     }
1033     my $gotsome
1034         = readfrompipes ()
1035         + reapchildren ();
1036     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1037     {
1038       check_refresh_wanted();
1039       check_squeue();
1040       update_progress_stats();
1041       select (undef, undef, undef, 0.1);
1042     }
1043     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1044     {
1045       update_progress_stats();
1046     }
1047     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1048                                         $_->{node}->{hold_count} < 4 } @slot);
1049     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1050         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1051     {
1052       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1053           .($thisround_failed+$thisround_succeeded)
1054           .") -- giving up on this round";
1055       Log (undef, $message);
1056       last THISROUND;
1057     }
1058
1059     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1060     for (my $i=$#freeslot; $i>=0; $i--) {
1061       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1062         push @holdslot, (splice @freeslot, $i, 1);
1063       }
1064     }
1065     for (my $i=$#holdslot; $i>=0; $i--) {
1066       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1067         push @freeslot, (splice @holdslot, $i, 1);
1068       }
1069     }
1070
1071     # give up if no nodes are succeeding
1072     if ($working_slot_count < 1) {
1073       Log(undef, "Every node has failed -- giving up");
1074       last THISROUND;
1075     }
1076   }
1077 }
1078
1079
1080 push @freeslot, splice @holdslot;
1081 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1082
1083
1084 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1085 while (%proc)
1086 {
1087   if ($main::please_continue) {
1088     $main::please_continue = 0;
1089     goto THISROUND;
1090   }
1091   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1092   readfrompipes ();
1093   if (!reapchildren())
1094   {
1095     check_refresh_wanted();
1096     check_squeue();
1097     update_progress_stats();
1098     select (undef, undef, undef, 0.1);
1099     killem (keys %proc) if $main::please_freeze;
1100   }
1101 }
1102
1103 update_progress_stats();
1104 freeze_if_want_freeze();
1105
1106
1107 if (!defined $main::success)
1108 {
1109   if (!@jobstep_todo) {
1110     $main::success = 1;
1111   } elsif ($working_slot_count < 1) {
1112     save_output_collection();
1113     save_meta();
1114     exit(EX_RETRY_UNLOCKED);
1115   } elsif ($thisround_succeeded == 0 &&
1116            ($thisround_failed == 0 || $thisround_failed > 4)) {
1117     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1118     Log (undef, $message);
1119     $main::success = 0;
1120   }
1121 }
1122
1123 goto ONELEVEL if !defined $main::success;
1124
1125
1126 release_allocation();
1127 freeze();
1128 my $collated_output = save_output_collection();
1129 Log (undef, "finish");
1130
1131 save_meta();
1132
1133 my $final_state;
1134 if ($collated_output && $main::success) {
1135   $final_state = 'Complete';
1136 } else {
1137   $final_state = 'Failed';
1138 }
1139 $Job->update_attributes('state' => $final_state);
1140
1141 exit (($final_state eq 'Complete') ? 0 : 1);
1142
1143
1144
1145 sub update_progress_stats
1146 {
1147   $progress_stats_updated = time;
1148   return if !$progress_is_dirty;
1149   my ($todo, $done, $running) = (scalar @jobstep_todo,
1150                                  scalar @jobstep_done,
1151                                  scalar keys(%proc));
1152   $Job->{'tasks_summary'} ||= {};
1153   $Job->{'tasks_summary'}->{'todo'} = $todo;
1154   $Job->{'tasks_summary'}->{'done'} = $done;
1155   $Job->{'tasks_summary'}->{'running'} = $running;
1156   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1157   Log (undef, "status: $done done, $running running, $todo todo");
1158   $progress_is_dirty = 0;
1159 }
1160
1161
1162
1163 sub reapchildren
1164 {
1165   my $pid = waitpid (-1, WNOHANG);
1166   return 0 if $pid <= 0;
1167
1168   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1169                   . "."
1170                   . $slot[$proc{$pid}->{slot}]->{cpu});
1171   my $jobstepid = $proc{$pid}->{jobstep};
1172   my $elapsed = time - $proc{$pid}->{time};
1173   my $Jobstep = $jobstep[$jobstepid];
1174
1175   my $childstatus = $?;
1176   my $exitvalue = $childstatus >> 8;
1177   my $exitinfo = "exit ".exit_status_s($childstatus);
1178   $Jobstep->{'arvados_task'}->reload;
1179   my $task_success = $Jobstep->{'arvados_task'}->{success};
1180
1181   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1182
1183   if (!defined $task_success) {
1184     # task did not indicate one way or the other --> fail
1185     Log($jobstepid, sprintf(
1186           "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.",
1187           exit_status_s($childstatus)));
1188     $Jobstep->{'arvados_task'}->{success} = 0;
1189     $Jobstep->{'arvados_task'}->save;
1190     $task_success = 0;
1191   }
1192
1193   if (!$task_success)
1194   {
1195     my $temporary_fail;
1196     $temporary_fail ||= $Jobstep->{tempfail};
1197     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1198
1199     ++$thisround_failed;
1200     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1201
1202     # Check for signs of a failed or misconfigured node
1203     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1204         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1205       # Don't count this against jobstep failure thresholds if this
1206       # node is already suspected faulty and srun exited quickly
1207       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1208           $elapsed < 5) {
1209         Log ($jobstepid, "blaming failure on suspect node " .
1210              $slot[$proc{$pid}->{slot}]->{node}->{name});
1211         $temporary_fail ||= 1;
1212       }
1213       ban_node_by_slot($proc{$pid}->{slot});
1214     }
1215
1216     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1217                              ++$Jobstep->{'failures'},
1218                              $temporary_fail ? 'temporary' : 'permanent',
1219                              $elapsed));
1220
1221     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1222       # Give up on this task, and the whole job
1223       $main::success = 0;
1224     }
1225     # Put this task back on the todo queue
1226     push @jobstep_todo, $jobstepid;
1227     $Job->{'tasks_summary'}->{'failed'}++;
1228   }
1229   else
1230   {
1231     ++$thisround_succeeded;
1232     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1233     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1234     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1235     push @jobstep_done, $jobstepid;
1236     Log ($jobstepid, "success in $elapsed seconds");
1237   }
1238   $Jobstep->{exitcode} = $childstatus;
1239   $Jobstep->{finishtime} = time;
1240   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1241   $Jobstep->{'arvados_task'}->save;
1242   process_stderr ($jobstepid, $task_success);
1243   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1244                            length($Jobstep->{'arvados_task'}->{output}),
1245                            $Jobstep->{'arvados_task'}->{output}));
1246
1247   close $reader{$jobstepid};
1248   delete $reader{$jobstepid};
1249   delete $slot[$proc{$pid}->{slot}]->{pid};
1250   push @freeslot, $proc{$pid}->{slot};
1251   delete $proc{$pid};
1252
1253   if ($task_success) {
1254     # Load new tasks
1255     my $newtask_list = [];
1256     my $newtask_results;
1257     do {
1258       $newtask_results = api_call(
1259         "job_tasks/list",
1260         'where' => {
1261           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1262         },
1263         'order' => 'qsequence',
1264         'offset' => scalar(@$newtask_list),
1265       );
1266       push(@$newtask_list, @{$newtask_results->{items}});
1267     } while (@{$newtask_results->{items}});
1268     foreach my $arvados_task (@$newtask_list) {
1269       my $jobstep = {
1270         'level' => $arvados_task->{'sequence'},
1271         'failures' => 0,
1272         'arvados_task' => $arvados_task
1273       };
1274       push @jobstep, $jobstep;
1275       push @jobstep_todo, $#jobstep;
1276     }
1277   }
1278
1279   $progress_is_dirty = 1;
1280   1;
1281 }
1282
1283 sub check_refresh_wanted
1284 {
1285   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1286   if (@stat && $stat[9] > $latest_refresh) {
1287     $latest_refresh = scalar time;
1288     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1289     for my $attr ('cancelled_at',
1290                   'cancelled_by_user_uuid',
1291                   'cancelled_by_client_uuid',
1292                   'state') {
1293       $Job->{$attr} = $Job2->{$attr};
1294     }
1295     if ($Job->{'state'} ne "Running") {
1296       if ($Job->{'state'} eq "Cancelled") {
1297         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1298       } else {
1299         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1300       }
1301       $main::success = 0;
1302       $main::please_freeze = 1;
1303     }
1304   }
1305 }
1306
1307 sub check_squeue
1308 {
1309   my $last_squeue_check = $squeue_checked;
1310
1311   # Do not call `squeue` or check the kill list more than once every
1312   # 15 seconds.
1313   return if $last_squeue_check > time - 15;
1314   $squeue_checked = time;
1315
1316   # Look for children from which we haven't received stderr data since
1317   # the last squeue check. If no such children exist, all procs are
1318   # alive and there's no need to even look at squeue.
1319   #
1320   # As long as the crunchstat poll interval (10s) is shorter than the
1321   # squeue check interval (15s) this should make the squeue check an
1322   # infrequent event.
1323   my $silent_procs = 0;
1324   for my $jobstep (values %proc)
1325   {
1326     if ($jobstep->{stderr_at} < $last_squeue_check)
1327     {
1328       $silent_procs++;
1329     }
1330   }
1331   return if $silent_procs == 0;
1332
1333   # use killem() on procs whose killtime is reached
1334   while (my ($pid, $jobstep) = each %proc)
1335   {
1336     if (exists $jobstep->{killtime}
1337         && $jobstep->{killtime} <= time
1338         && $jobstep->{stderr_at} < $last_squeue_check)
1339     {
1340       my $sincewhen = "";
1341       if ($jobstep->{stderr_at}) {
1342         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1343       }
1344       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1345       killem ($pid);
1346     }
1347   }
1348
1349   if (!$have_slurm)
1350   {
1351     # here is an opportunity to check for mysterious problems with local procs
1352     return;
1353   }
1354
1355   # Get a list of steps still running.  Note: squeue(1) says --steps
1356   # selects a format (which we override anyway) and allows us to
1357   # specify which steps we're interested in (which we don't).
1358   # Importantly, it also changes the meaning of %j from "job name" to
1359   # "step name" and (although this isn't mentioned explicitly in the
1360   # docs) switches from "one line per job" mode to "one line per step"
1361   # mode. Without it, we'd just get a list of one job, instead of a
1362   # list of N steps.
1363   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1364   if ($? != 0)
1365   {
1366     Log(undef, "warning: squeue exit status $? ($!)");
1367     return;
1368   }
1369   chop @squeue;
1370
1371   # which of my jobsteps are running, according to squeue?
1372   my %ok;
1373   for my $jobstepname (@squeue)
1374   {
1375     $ok{$jobstepname} = 1;
1376   }
1377
1378   # Check for child procs >60s old and not mentioned by squeue.
1379   while (my ($pid, $jobstep) = each %proc)
1380   {
1381     if ($jobstep->{time} < time - 60
1382         && $jobstep->{jobstepname}
1383         && !exists $ok{$jobstep->{jobstepname}}
1384         && !exists $jobstep->{killtime})
1385     {
1386       # According to slurm, this task has ended (successfully or not)
1387       # -- but our srun child hasn't exited. First we must wait (30
1388       # seconds) in case this is just a race between communication
1389       # channels. Then, if our srun child process still hasn't
1390       # terminated, we'll conclude some slurm communication
1391       # error/delay has caused the task to die without notifying srun,
1392       # and we'll kill srun ourselves.
1393       $jobstep->{killtime} = time + 30;
1394       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1395     }
1396   }
1397 }
1398
1399
1400 sub release_allocation
1401 {
1402   if ($have_slurm)
1403   {
1404     Log (undef, "release job allocation");
1405     system "scancel $ENV{SLURM_JOB_ID}";
1406   }
1407 }
1408
1409
1410 sub readfrompipes
1411 {
1412   my $gotsome = 0;
1413   foreach my $job (keys %reader)
1414   {
1415     my $buf;
1416     while (0 < sysread ($reader{$job}, $buf, 8192))
1417     {
1418       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1419       $jobstep[$job]->{stderr_at} = time;
1420       $jobstep[$job]->{stderr} .= $buf;
1421       preprocess_stderr ($job);
1422       if (length ($jobstep[$job]->{stderr}) > 16384)
1423       {
1424         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1425       }
1426       $gotsome = 1;
1427     }
1428   }
1429   return $gotsome;
1430 }
1431
1432
1433 sub preprocess_stderr
1434 {
1435   my $job = shift;
1436
1437   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1438     my $line = $1;
1439     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1440     Log ($job, "stderr $line");
1441     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1442       # whoa.
1443       $main::please_freeze = 1;
1444     }
1445     elsif ($line =~ /srun: error: Node failure on/) {
1446       my $job_slot_index = $jobstep[$job]->{slotindex};
1447       $slot[$job_slot_index]->{node}->{fail_count}++;
1448       $jobstep[$job]->{tempfail} = 1;
1449       ban_node_by_slot($job_slot_index);
1450     }
1451     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1452       $jobstep[$job]->{tempfail} = 1;
1453       ban_node_by_slot($jobstep[$job]->{slotindex});
1454     }
1455     elsif ($line =~ /arvados\.errors\.Keep/) {
1456       $jobstep[$job]->{tempfail} = 1;
1457     }
1458   }
1459 }
1460
1461
1462 sub process_stderr
1463 {
1464   my $job = shift;
1465   my $task_success = shift;
1466   preprocess_stderr ($job);
1467
1468   map {
1469     Log ($job, "stderr $_");
1470   } split ("\n", $jobstep[$job]->{stderr});
1471 }
1472
1473 sub fetch_block
1474 {
1475   my $hash = shift;
1476   my $keep;
1477   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1478     Log(undef, "fetch_block run error from arv-get $hash: $!");
1479     return undef;
1480   }
1481   my $output_block = "";
1482   while (1) {
1483     my $buf;
1484     my $bytes = sysread($keep, $buf, 1024 * 1024);
1485     if (!defined $bytes) {
1486       Log(undef, "fetch_block read error from arv-get: $!");
1487       $output_block = undef;
1488       last;
1489     } elsif ($bytes == 0) {
1490       # sysread returns 0 at the end of the pipe.
1491       last;
1492     } else {
1493       # some bytes were read into buf.
1494       $output_block .= $buf;
1495     }
1496   }
1497   close $keep;
1498   if ($?) {
1499     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1500     $output_block = undef;
1501   }
1502   return $output_block;
1503 }
1504
1505 # Create a collection by concatenating the output of all tasks (each
1506 # task's output is either a manifest fragment, a locator for a
1507 # manifest fragment stored in Keep, or nothing at all). Return the
1508 # portable_data_hash of the new collection.
1509 sub create_output_collection
1510 {
1511   Log (undef, "collate");
1512
1513   my ($child_out, $child_in);
1514   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1515 import arvados
1516 import sys
1517 print (arvados.api("v1").collections().
1518        create(body={"manifest_text": sys.stdin.read()}).
1519        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1520 }, retry_count());
1521
1522   my $task_idx = -1;
1523   my $manifest_size = 0;
1524   for (@jobstep)
1525   {
1526     ++$task_idx;
1527     my $output = $_->{'arvados_task'}->{output};
1528     next if (!defined($output));
1529     my $next_write;
1530     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1531       $next_write = fetch_block($output);
1532     } else {
1533       $next_write = $output;
1534     }
1535     if (defined($next_write)) {
1536       if (!defined(syswrite($child_in, $next_write))) {
1537         # There's been an error writing.  Stop the loop.
1538         # We'll log details about the exit code later.
1539         last;
1540       } else {
1541         $manifest_size += length($next_write);
1542       }
1543     } else {
1544       my $uuid = $_->{'arvados_task'}->{'uuid'};
1545       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1546       $main::success = 0;
1547     }
1548   }
1549   close($child_in);
1550   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1551
1552   my $joboutput;
1553   my $s = IO::Select->new($child_out);
1554   if ($s->can_read(120)) {
1555     sysread($child_out, $joboutput, 1024 * 1024);
1556     waitpid($pid, 0);
1557     if ($?) {
1558       Log(undef, "output collection creation exited " . exit_status_s($?));
1559       $joboutput = undef;
1560     } else {
1561       chomp($joboutput);
1562     }
1563   } else {
1564     Log (undef, "timed out while creating output collection");
1565     foreach my $signal (2, 2, 2, 15, 15, 9) {
1566       kill($signal, $pid);
1567       last if waitpid($pid, WNOHANG) == -1;
1568       sleep(1);
1569     }
1570   }
1571   close($child_out);
1572
1573   return $joboutput;
1574 }
1575
1576 # Calls create_output_collection, logs the result, and returns it.
1577 # If that was successful, save that as the output in the job record.
1578 sub save_output_collection {
1579   my $collated_output = create_output_collection();
1580
1581   if (!$collated_output) {
1582     Log(undef, "Failed to write output collection");
1583   }
1584   else {
1585     Log(undef, "job output $collated_output");
1586     $Job->update_attributes('output' => $collated_output);
1587   }
1588   return $collated_output;
1589 }
1590
1591 sub killem
1592 {
1593   foreach (@_)
1594   {
1595     my $sig = 2;                # SIGINT first
1596     if (exists $proc{$_}->{"sent_$sig"} &&
1597         time - $proc{$_}->{"sent_$sig"} > 4)
1598     {
1599       $sig = 15;                # SIGTERM if SIGINT doesn't work
1600     }
1601     if (exists $proc{$_}->{"sent_$sig"} &&
1602         time - $proc{$_}->{"sent_$sig"} > 4)
1603     {
1604       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1605     }
1606     if (!exists $proc{$_}->{"sent_$sig"})
1607     {
1608       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1609       kill $sig, $_;
1610       select (undef, undef, undef, 0.1);
1611       if ($sig == 2)
1612       {
1613         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1614       }
1615       $proc{$_}->{"sent_$sig"} = time;
1616       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1617     }
1618   }
1619 }
1620
1621
1622 sub fhbits
1623 {
1624   my($bits);
1625   for (@_) {
1626     vec($bits,fileno($_),1) = 1;
1627   }
1628   $bits;
1629 }
1630
1631
1632 # Send log output to Keep via arv-put.
1633 #
1634 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1635 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1636 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1637 # $log_pipe_pid is the pid of the arv-put subprocess.
1638 #
1639 # The only functions that should access these variables directly are:
1640 #
1641 # log_writer_start($logfilename)
1642 #     Starts an arv-put pipe, reading data on stdin and writing it to
1643 #     a $logfilename file in an output collection.
1644 #
1645 # log_writer_read_output([$timeout])
1646 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1647 #     Passes $timeout to the select() call, with a default of 0.01.
1648 #     Returns the result of the last read() call on $log_pipe_out, or
1649 #     -1 if read() wasn't called because select() timed out.
1650 #     Only other log_writer_* functions should need to call this.
1651 #
1652 # log_writer_send($txt)
1653 #     Writes $txt to the output log collection.
1654 #
1655 # log_writer_finish()
1656 #     Closes the arv-put pipe and returns the output that it produces.
1657 #
1658 # log_writer_is_active()
1659 #     Returns a true value if there is currently a live arv-put
1660 #     process, false otherwise.
1661 #
1662 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1663     $log_pipe_pid);
1664
1665 sub log_writer_start($)
1666 {
1667   my $logfilename = shift;
1668   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1669                         'arv-put',
1670                         '--stream',
1671                         '--retries', '3',
1672                         '--filename', $logfilename,
1673                         '-');
1674   $log_pipe_out_buf = "";
1675   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1676 }
1677
1678 sub log_writer_read_output {
1679   my $timeout = shift || 0.01;
1680   my $read = -1;
1681   while ($read && $log_pipe_out_select->can_read($timeout)) {
1682     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1683                  length($log_pipe_out_buf));
1684   }
1685   if (!defined($read)) {
1686     Log(undef, "error reading log manifest from arv-put: $!");
1687   }
1688   return $read;
1689 }
1690
1691 sub log_writer_send($)
1692 {
1693   my $txt = shift;
1694   print $log_pipe_in $txt;
1695   log_writer_read_output();
1696 }
1697
1698 sub log_writer_finish()
1699 {
1700   return unless $log_pipe_pid;
1701
1702   close($log_pipe_in);
1703
1704   my $logger_failed = 0;
1705   my $read_result = log_writer_read_output(120);
1706   if ($read_result == -1) {
1707     $logger_failed = -1;
1708     Log (undef, "timed out reading from 'arv-put'");
1709   } elsif ($read_result != 0) {
1710     $logger_failed = -2;
1711     Log(undef, "failed to read arv-put log manifest to EOF");
1712   }
1713
1714   waitpid($log_pipe_pid, 0);
1715   if ($?) {
1716     $logger_failed ||= $?;
1717     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1718   }
1719
1720   close($log_pipe_out);
1721   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1722   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1723       $log_pipe_out_select = undef;
1724
1725   return $arv_put_output;
1726 }
1727
1728 sub log_writer_is_active() {
1729   return $log_pipe_pid;
1730 }
1731
1732 sub Log                         # ($jobstep_id, $logmessage)
1733 {
1734   if ($_[1] =~ /\n/) {
1735     for my $line (split (/\n/, $_[1])) {
1736       Log ($_[0], $line);
1737     }
1738     return;
1739   }
1740   my $fh = select STDERR; $|=1; select $fh;
1741   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1742   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1743   $message .= "\n";
1744   my $datetime;
1745   if (log_writer_is_active() || -t STDERR) {
1746     my @gmtime = gmtime;
1747     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1748                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1749   }
1750   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1751
1752   if (log_writer_is_active()) {
1753     log_writer_send($datetime . " " . $message);
1754   }
1755 }
1756
1757
1758 sub croak
1759 {
1760   my ($package, $file, $line) = caller;
1761   my $message = "@_ at $file line $line\n";
1762   Log (undef, $message);
1763   freeze() if @jobstep_todo;
1764   create_output_collection() if @jobstep_todo;
1765   cleanup();
1766   save_meta();
1767   die;
1768 }
1769
1770
1771 sub cleanup
1772 {
1773   return unless $Job;
1774   if ($Job->{'state'} eq 'Cancelled') {
1775     $Job->update_attributes('finished_at' => scalar gmtime);
1776   } else {
1777     $Job->update_attributes('state' => 'Failed');
1778   }
1779 }
1780
1781
1782 sub save_meta
1783 {
1784   my $justcheckpoint = shift; # false if this will be the last meta saved
1785   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1786   return unless log_writer_is_active();
1787   my $log_manifest = log_writer_finish();
1788   return unless defined($log_manifest);
1789
1790   if ($Job->{log}) {
1791     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1792     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1793   }
1794
1795   my $log_coll = api_call(
1796     "collections/create", ensure_unique_name => 1, collection => {
1797       manifest_text => $log_manifest,
1798       owner_uuid => $Job->{owner_uuid},
1799       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1800     });
1801   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1802   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1803 }
1804
1805
1806 sub freeze_if_want_freeze
1807 {
1808   if ($main::please_freeze)
1809   {
1810     release_allocation();
1811     if (@_)
1812     {
1813       # kill some srun procs before freeze+stop
1814       map { $proc{$_} = {} } @_;
1815       while (%proc)
1816       {
1817         killem (keys %proc);
1818         select (undef, undef, undef, 0.1);
1819         my $died;
1820         while (($died = waitpid (-1, WNOHANG)) > 0)
1821         {
1822           delete $proc{$died};
1823         }
1824       }
1825     }
1826     freeze();
1827     create_output_collection();
1828     cleanup();
1829     save_meta();
1830     exit 1;
1831   }
1832 }
1833
1834
1835 sub freeze
1836 {
1837   Log (undef, "Freeze not implemented");
1838   return;
1839 }
1840
1841
1842 sub thaw
1843 {
1844   croak ("Thaw not implemented");
1845 }
1846
1847
1848 sub freezequote
1849 {
1850   my $s = shift;
1851   $s =~ s/\\/\\\\/g;
1852   $s =~ s/\n/\\n/g;
1853   return $s;
1854 }
1855
1856
1857 sub freezeunquote
1858 {
1859   my $s = shift;
1860   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1861   return $s;
1862 }
1863
1864
1865 sub srun
1866 {
1867   my $srunargs = shift;
1868   my $execargs = shift;
1869   my $opts = shift || {};
1870   my $stdin = shift;
1871   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1872
1873   $Data::Dumper::Terse = 1;
1874   $Data::Dumper::Indent = 0;
1875   my $show_cmd = Dumper($args);
1876   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1877   $show_cmd =~ s/\n/ /g;
1878   if ($opts->{fork}) {
1879     Log(undef, "starting: $show_cmd");
1880   } else {
1881     # This is a child process: parent is in charge of reading our
1882     # stderr and copying it to Log() if needed.
1883     warn "starting: $show_cmd\n";
1884   }
1885
1886   if (defined $stdin) {
1887     my $child = open STDIN, "-|";
1888     defined $child or die "no fork: $!";
1889     if ($child == 0) {
1890       print $stdin or die $!;
1891       close STDOUT or die $!;
1892       exit 0;
1893     }
1894   }
1895
1896   return system (@$args) if $opts->{fork};
1897
1898   exec @$args;
1899   warn "ENV size is ".length(join(" ",%ENV));
1900   die "exec failed: $!: @$args";
1901 }
1902
1903
1904 sub ban_node_by_slot {
1905   # Don't start any new jobsteps on this node for 60 seconds
1906   my $slotid = shift;
1907   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1908   $slot[$slotid]->{node}->{hold_count}++;
1909   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1910 }
1911
1912 sub must_lock_now
1913 {
1914   my ($lockfile, $error_message) = @_;
1915   open L, ">", $lockfile or croak("$lockfile: $!");
1916   if (!flock L, LOCK_EX|LOCK_NB) {
1917     croak("Can't lock $lockfile: $error_message\n");
1918   }
1919 }
1920
1921 sub find_docker_image {
1922   # Given a Keep locator, check to see if it contains a Docker image.
1923   # If so, return its stream name and Docker hash.
1924   # If not, return undef for both values.
1925   my $locator = shift;
1926   my ($streamname, $filename);
1927   my $image = api_call("collections/get", uuid => $locator);
1928   if ($image) {
1929     foreach my $line (split(/\n/, $image->{manifest_text})) {
1930       my @tokens = split(/\s+/, $line);
1931       next if (!@tokens);
1932       $streamname = shift(@tokens);
1933       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1934         if (defined($filename)) {
1935           return (undef, undef);  # More than one file in the Collection.
1936         } else {
1937           $filename = (split(/:/, $filedata, 3))[2];
1938         }
1939       }
1940     }
1941   }
1942   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1943     return ($streamname, $1);
1944   } else {
1945     return (undef, undef);
1946   }
1947 }
1948
1949 sub retry_count {
1950   # Calculate the number of times an operation should be retried,
1951   # assuming exponential backoff, and that we're willing to retry as
1952   # long as tasks have been running.  Enforce a minimum of 3 retries.
1953   my ($starttime, $endtime, $timediff, $retries);
1954   if (@jobstep) {
1955     $starttime = $jobstep[0]->{starttime};
1956     $endtime = $jobstep[-1]->{finishtime};
1957   }
1958   if (!defined($starttime)) {
1959     $timediff = 0;
1960   } elsif (!defined($endtime)) {
1961     $timediff = time - $starttime;
1962   } else {
1963     $timediff = ($endtime - $starttime) - (time - $endtime);
1964   }
1965   if ($timediff > 0) {
1966     $retries = int(log($timediff) / log(2));
1967   } else {
1968     $retries = 1;  # Use the minimum.
1969   }
1970   return ($retries > 3) ? $retries : 3;
1971 }
1972
1973 sub retry_op {
1974   # Pass in two function references.
1975   # This method will be called with the remaining arguments.
1976   # If it dies, retry it with exponential backoff until it succeeds,
1977   # or until the current retry_count is exhausted.  After each failure
1978   # that can be retried, the second function will be called with
1979   # the current try count (0-based), next try time, and error message.
1980   my $operation = shift;
1981   my $retry_callback = shift;
1982   my $retries = retry_count();
1983   foreach my $try_count (0..$retries) {
1984     my $next_try = time + (2 ** $try_count);
1985     my $result = eval { $operation->(@_); };
1986     if (!$@) {
1987       return $result;
1988     } elsif ($try_count < $retries) {
1989       $retry_callback->($try_count, $next_try, $@);
1990       my $sleep_time = $next_try - time;
1991       sleep($sleep_time) if ($sleep_time > 0);
1992     }
1993   }
1994   # Ensure the error message ends in a newline, so Perl doesn't add
1995   # retry_op's line number to it.
1996   chomp($@);
1997   die($@ . "\n");
1998 }
1999
2000 sub api_call {
2001   # Pass in a /-separated API method name, and arguments for it.
2002   # This function will call that method, retrying as needed until
2003   # the current retry_count is exhausted, with a log on the first failure.
2004   my $method_name = shift;
2005   my $log_api_retry = sub {
2006     my ($try_count, $next_try_at, $errmsg) = @_;
2007     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2008     $errmsg =~ s/\s/ /g;
2009     $errmsg =~ s/\s+$//;
2010     my $retry_msg;
2011     if ($next_try_at < time) {
2012       $retry_msg = "Retrying.";
2013     } else {
2014       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2015       $retry_msg = "Retrying at $next_try_fmt.";
2016     }
2017     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2018   };
2019   my $method = $arv;
2020   foreach my $key (split(/\//, $method_name)) {
2021     $method = $method->{$key};
2022   }
2023   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2024 }
2025
2026 sub exit_status_s {
2027   # Given a $?, return a human-readable exit code string like "0" or
2028   # "1" or "0 with signal 1" or "1 with signal 11".
2029   my $exitcode = shift;
2030   my $s = $exitcode >> 8;
2031   if ($exitcode & 0x7f) {
2032     $s .= " with signal " . ($exitcode & 0x7f);
2033   }
2034   if ($exitcode & 0x80) {
2035     $s .= " with core dump";
2036   }
2037   return $s;
2038 }
2039
2040 sub handle_readall {
2041   # Pass in a glob reference to a file handle.
2042   # Read all its contents and return them as a string.
2043   my $fh_glob_ref = shift;
2044   local $/ = undef;
2045   return <$fh_glob_ref>;
2046 }
2047
2048 sub tar_filename_n {
2049   my $n = shift;
2050   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2051 }
2052
2053 sub add_git_archive {
2054   # Pass in a git archive command as a string or list, a la system().
2055   # This method will save its output to be included in the archive sent to the
2056   # build script.
2057   my $git_input;
2058   $git_tar_count++;
2059   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2060     croak("Failed to save git archive: $!");
2061   }
2062   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2063   close($git_input);
2064   waitpid($git_pid, 0);
2065   close(GIT_ARCHIVE);
2066   if ($?) {
2067     croak("Failed to save git archive: git exited " . exit_status_s($?));
2068   }
2069 }
2070
2071 sub combined_git_archive {
2072   # Combine all saved tar archives into a single archive, then return its
2073   # contents in a string.  Return undef if no archives have been saved.
2074   if ($git_tar_count < 1) {
2075     return undef;
2076   }
2077   my $base_tar_name = tar_filename_n(1);
2078   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2079     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2080     if ($tar_exit != 0) {
2081       croak("Error preparing build archive: tar -A exited " .
2082             exit_status_s($tar_exit));
2083     }
2084   }
2085   if (!open(GIT_TAR, "<", $base_tar_name)) {
2086     croak("Could not open build archive: $!");
2087   }
2088   my $tar_contents = handle_readall(\*GIT_TAR);
2089   close(GIT_TAR);
2090   return $tar_contents;
2091 }
2092
2093 sub set_nonblocking {
2094   my $fh = shift;
2095   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2096   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2097 }
2098
2099 __DATA__
2100 #!/usr/bin/env perl
2101 #
2102 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2103 # server invokes this script on individual compute nodes, or localhost if we're
2104 # running a job locally.  It gets called in two modes:
2105 #
2106 # * No arguments: Installation mode.  Read a tar archive from the DATA
2107 #   file handle; it includes the Crunch script's source code, and
2108 #   maybe SDKs as well.  Those should be installed in the proper
2109 #   locations.  This runs outside of any Docker container, so don't try to
2110 #   introspect Crunch's runtime environment.
2111 #
2112 # * With arguments: Crunch script run mode.  This script should set up the
2113 #   environment, then run the command specified in the arguments.  This runs
2114 #   inside any Docker container.
2115
2116 use Fcntl ':flock';
2117 use File::Path qw( make_path remove_tree );
2118 use POSIX qw(getcwd);
2119
2120 use constant TASK_TEMPFAIL => 111;
2121
2122 # Map SDK subdirectories to the path environments they belong to.
2123 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2124
2125 my $destdir = $ENV{"CRUNCH_SRC"};
2126 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2127 my $repo = $ENV{"CRUNCH_SRC_URL"};
2128 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2129 my $job_work = $ENV{"JOB_WORK"};
2130 my $task_work = $ENV{"TASK_WORK"};
2131
2132 open(STDOUT_ORIG, ">&", STDOUT);
2133 open(STDERR_ORIG, ">&", STDERR);
2134
2135 for my $dir ($destdir, $job_work, $task_work) {
2136   if ($dir) {
2137     make_path $dir;
2138     -e $dir or die "Failed to create temporary directory ($dir): $!";
2139   }
2140 }
2141
2142 if ($task_work) {
2143   remove_tree($task_work, {keep_root => 1});
2144 }
2145
2146 ### Crunch script run mode
2147 if (@ARGV) {
2148   # We want to do routine logging during task 0 only.  This gives the user
2149   # the information they need, but avoids repeating the information for every
2150   # task.
2151   my $Log;
2152   if ($ENV{TASK_SEQUENCE} eq "0") {
2153     $Log = sub {
2154       my $msg = shift;
2155       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2156     };
2157   } else {
2158     $Log = sub { };
2159   }
2160
2161   my $python_src = "$install_dir/python";
2162   my $venv_dir = "$job_work/.arvados.venv";
2163   my $venv_built = -e "$venv_dir/bin/activate";
2164   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2165     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2166                  "--python=python2.7", $venv_dir);
2167     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2168     $venv_built = 1;
2169     $Log->("Built Python SDK virtualenv");
2170   }
2171
2172   my $pip_bin = "pip";
2173   if ($venv_built) {
2174     $Log->("Running in Python SDK virtualenv");
2175     $pip_bin = "$venv_dir/bin/pip";
2176     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2177     @ARGV = ("/bin/sh", "-ec",
2178              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2179   } elsif (-d $python_src) {
2180     $Log->("Warning: virtualenv not found inside Docker container default " .
2181            "\$PATH. Can't install Python SDK.");
2182   }
2183
2184   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2185   if ($pkgs) {
2186     $Log->("Using Arvados SDK:");
2187     foreach my $line (split /\n/, $pkgs) {
2188       $Log->($line);
2189     }
2190   } else {
2191     $Log->("Arvados SDK packages not found");
2192   }
2193
2194   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2195     my $sdk_path = "$install_dir/$sdk_dir";
2196     if (-d $sdk_path) {
2197       if ($ENV{$sdk_envkey}) {
2198         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2199       } else {
2200         $ENV{$sdk_envkey} = $sdk_path;
2201       }
2202       $Log->("Arvados SDK added to %s", $sdk_envkey);
2203     }
2204   }
2205
2206   exec(@ARGV);
2207   die "Cannot exec `@ARGV`: $!";
2208 }
2209
2210 ### Installation mode
2211 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2212 flock L, LOCK_EX;
2213 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2214   # This exact git archive (source + arvados sdk) is already installed
2215   # here, so there's no need to reinstall it.
2216
2217   # We must consume our DATA section, though: otherwise the process
2218   # feeding it to us will get SIGPIPE.
2219   my $buf;
2220   while (read(DATA, $buf, 65536)) { }
2221
2222   exit(0);
2223 }
2224
2225 unlink "$destdir.archive_hash";
2226 mkdir $destdir;
2227
2228 do {
2229   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2230   local $SIG{PIPE} = "IGNORE";
2231   warn "Extracting archive: $archive_hash\n";
2232   # --ignore-zeros is necessary sometimes: depending on how much NUL
2233   # padding tar -A put on our combined archive (which in turn depends
2234   # on the length of the component archives) tar without
2235   # --ignore-zeros will exit before consuming stdin and cause close()
2236   # to fail on the resulting SIGPIPE.
2237   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2238     die "Error launching 'tar -xC $destdir': $!";
2239   }
2240   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2241   # get SIGPIPE.  We must feed it data incrementally.
2242   my $tar_input;
2243   while (read(DATA, $tar_input, 65536)) {
2244     print TARX $tar_input;
2245   }
2246   if(!close(TARX)) {
2247     die "'tar -xC $destdir' exited $?: $!";
2248   }
2249 };
2250
2251 mkdir $install_dir;
2252
2253 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2254 if (-d $sdk_root) {
2255   foreach my $sdk_lang (("python",
2256                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2257     if (-d "$sdk_root/$sdk_lang") {
2258       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2259         die "Failed to install $sdk_lang SDK: $!";
2260       }
2261     }
2262   }
2263 }
2264
2265 my $python_dir = "$install_dir/python";
2266 if ((-d $python_dir) and can_run("python2.7")) {
2267   open(my $egg_info_pipe, "-|",
2268        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2269   my @egg_info_errors = <$egg_info_pipe>;
2270   close($egg_info_pipe);
2271
2272   if ($?) {
2273     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2274       # egg_info apparently failed because it couldn't ask git for a build tag.
2275       # Specify no build tag.
2276       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2277       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2278       close($pysdk_cfg);
2279     } else {
2280       my $egg_info_exit = $? >> 8;
2281       foreach my $errline (@egg_info_errors) {
2282         warn $errline;
2283       }
2284       warn "python setup.py egg_info failed: exit $egg_info_exit";
2285       exit ($egg_info_exit || 1);
2286     }
2287   }
2288 }
2289
2290 # Hide messages from the install script (unless it fails: shell_or_die
2291 # will show $destdir.log in that case).
2292 open(STDOUT, ">>", "$destdir.log");
2293 open(STDERR, ">&", STDOUT);
2294
2295 if (-e "$destdir/crunch_scripts/install") {
2296     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2297 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2298     # Old version
2299     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2300 } elsif (-e "./install.sh") {
2301     shell_or_die (undef, "./install.sh", $install_dir);
2302 }
2303
2304 if ($archive_hash) {
2305     unlink "$destdir.archive_hash.new";
2306     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2307     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2308 }
2309
2310 close L;
2311
2312 sub can_run {
2313   my $command_name = shift;
2314   open(my $which, "-|", "which", $command_name);
2315   while (<$which>) { }
2316   close($which);
2317   return ($? == 0);
2318 }
2319
2320 sub shell_or_die
2321 {
2322   my $exitcode = shift;
2323
2324   if ($ENV{"DEBUG"}) {
2325     print STDERR "@_\n";
2326   }
2327   if (system (@_) != 0) {
2328     my $err = $!;
2329     my $code = $?;
2330     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2331     open STDERR, ">&STDERR_ORIG";
2332     system ("cat $destdir.log >&2");
2333     warn "@_ failed ($err): $exitstatus";
2334     if (defined($exitcode)) {
2335       exit $exitcode;
2336     }
2337     else {
2338       exit (($code >> 8) || 1);
2339     }
2340   }
2341 }
2342
2343 __DATA__