8815: Now expect /usr/local/bin/crunchrunner. Bind mount host certificates to
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 $ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
113 unless (defined($ENV{"HOST_CERTS"}) {
114   if (-f "/etc/ssl/certs/ca-certificates.crt") {
115     $ENV{"HOST_CERTS"} = "/etc/ssl/certs/ca-certificates.crt";
116   } else if (-f "/etc/pki/tls/certs/ca-bundle.crt") {
117     $ENV{"HOST_CERTS"} = "/etc/pki/tls/certs/ca-bundle.crt";
118 }
119
120 # Create the tmp directory if it does not exist
121 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
122   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
123 }
124
125 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
126 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
127 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
128 mkdir ($ENV{"JOB_WORK"});
129
130 my %proc;
131 my $force_unlock;
132 my $git_dir;
133 my $jobspec;
134 my $job_api_token;
135 my $no_clear_tmp;
136 my $resume_stash;
137 my $cgroup_root = "/sys/fs/cgroup";
138 my $docker_bin = "docker.io";
139 my $docker_run_args = "";
140 GetOptions('force-unlock' => \$force_unlock,
141            'git-dir=s' => \$git_dir,
142            'job=s' => \$jobspec,
143            'job-api-token=s' => \$job_api_token,
144            'no-clear-tmp' => \$no_clear_tmp,
145            'resume-stash=s' => \$resume_stash,
146            'cgroup-root=s' => \$cgroup_root,
147            'docker-bin=s' => \$docker_bin,
148            'docker-run-args=s' => \$docker_run_args,
149     );
150
151 if (defined $job_api_token) {
152   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
153 }
154
155 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
156
157
158 $SIG{'USR1'} = sub
159 {
160   $main::ENV{CRUNCH_DEBUG} = 1;
161 };
162 $SIG{'USR2'} = sub
163 {
164   $main::ENV{CRUNCH_DEBUG} = 0;
165 };
166
167 my $arv = Arvados->new('apiVersion' => 'v1');
168
169 my $Job;
170 my $job_id;
171 my $dbh;
172 my $sth;
173 my @jobstep;
174
175 my $local_job;
176 if ($jobspec =~ /^[-a-z\d]+$/)
177 {
178   # $jobspec is an Arvados UUID, not a JSON job specification
179   $Job = api_call("jobs/get", uuid => $jobspec);
180   $local_job = 0;
181 }
182 else
183 {
184   $local_job = JSON::decode_json($jobspec);
185 }
186
187
188 # Make sure our workers (our slurm nodes, localhost, or whatever) are
189 # at least able to run basic commands: they aren't down or severely
190 # misconfigured.
191 my $cmd = ['true'];
192 if (($Job || $local_job)->{docker_image_locator}) {
193   $cmd = [$docker_bin, 'ps', '-q'];
194 }
195 Log(undef, "Sanity check is `@$cmd`");
196 my ($exited, $stdout, $stderr) = srun_sync(
197   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
198   $cmd,
199   {label => "sanity check"});
200 if ($exited != 0) {
201   Log(undef, "Sanity check failed: ".exit_status_s($exited));
202   exit EX_TEMPFAIL;
203 }
204 Log(undef, "Sanity check OK");
205
206
207 my $User = api_call("users/current");
208
209 if (!$local_job) {
210   if (!$force_unlock) {
211     # Claim this job, and make sure nobody else does
212     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
213     if ($@) {
214       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
215       exit EX_TEMPFAIL;
216     };
217   }
218 }
219 else
220 {
221   if (!$resume_stash)
222   {
223     map { croak ("No $_ specified") unless $local_job->{$_} }
224     qw(script script_version script_parameters);
225   }
226
227   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
228   $local_job->{'started_at'} = gmtime;
229   $local_job->{'state'} = 'Running';
230
231   $Job = api_call("jobs/create", job => $local_job);
232 }
233 $job_id = $Job->{'uuid'};
234
235 my $keep_logfile = $job_id . '.log.txt';
236 log_writer_start($keep_logfile);
237
238 $Job->{'runtime_constraints'} ||= {};
239 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
240 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
241
242 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
243 if ($? == 0) {
244   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
245   chomp($gem_versions);
246   chop($gem_versions);  # Closing parentheses
247 } else {
248   $gem_versions = "";
249 }
250 Log(undef,
251     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
252
253 Log (undef, "check slurm allocation");
254 my @slot;
255 my @node;
256 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
257 my @sinfo;
258 if (!$have_slurm)
259 {
260   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
261   push @sinfo, "$localcpus localhost";
262 }
263 if (exists $ENV{SLURM_NODELIST})
264 {
265   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
266 }
267 foreach (@sinfo)
268 {
269   my ($ncpus, $slurm_nodelist) = split;
270   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
271
272   my @nodelist;
273   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
274   {
275     my $nodelist = $1;
276     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
277     {
278       my $ranges = $1;
279       foreach (split (",", $ranges))
280       {
281         my ($a, $b);
282         if (/(\d+)-(\d+)/)
283         {
284           $a = $1;
285           $b = $2;
286         }
287         else
288         {
289           $a = $_;
290           $b = $_;
291         }
292         push @nodelist, map {
293           my $n = $nodelist;
294           $n =~ s/\[[-,\d]+\]/$_/;
295           $n;
296         } ($a..$b);
297       }
298     }
299     else
300     {
301       push @nodelist, $nodelist;
302     }
303   }
304   foreach my $nodename (@nodelist)
305   {
306     Log (undef, "node $nodename - $ncpus slots");
307     my $node = { name => $nodename,
308                  ncpus => $ncpus,
309                  # The number of consecutive times a task has been dispatched
310                  # to this node and failed.
311                  losing_streak => 0,
312                  # The number of consecutive times that SLURM has reported
313                  # a node failure since the last successful task.
314                  fail_count => 0,
315                  # Don't dispatch work to this node until this time
316                  # (in seconds since the epoch) has passed.
317                  hold_until => 0 };
318     foreach my $cpu (1..$ncpus)
319     {
320       push @slot, { node => $node,
321                     cpu => $cpu };
322     }
323   }
324   push @node, @nodelist;
325 }
326
327
328
329 # Ensure that we get one jobstep running on each allocated node before
330 # we start overloading nodes with concurrent steps
331
332 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
333
334
335 $Job->update_attributes(
336   'tasks_summary' => { 'failed' => 0,
337                        'todo' => 1,
338                        'running' => 0,
339                        'done' => 0 });
340
341 Log (undef, "start");
342 $SIG{'INT'} = sub { $main::please_freeze = 1; };
343 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
344 $SIG{'TERM'} = \&croak;
345 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
346 $SIG{'ALRM'} = sub { $main::please_info = 1; };
347 $SIG{'CONT'} = sub { $main::please_continue = 1; };
348 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
349
350 $main::please_freeze = 0;
351 $main::please_info = 0;
352 $main::please_continue = 0;
353 $main::please_refresh = 0;
354 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
355
356 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
357 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
358 $ENV{"JOB_UUID"} = $job_id;
359
360
361 my @jobstep_todo = ();
362 my @jobstep_done = ();
363 my @jobstep_tomerge = ();
364 my $jobstep_tomerge_level = 0;
365 my $squeue_checked = 0;
366 my $latest_refresh = scalar time;
367
368
369
370 if (defined $Job->{thawedfromkey})
371 {
372   thaw ($Job->{thawedfromkey});
373 }
374 else
375 {
376   my $first_task = api_call("job_tasks/create", job_task => {
377     'job_uuid' => $Job->{'uuid'},
378     'sequence' => 0,
379     'qsequence' => 0,
380     'parameters' => {},
381   });
382   push @jobstep, { 'level' => 0,
383                    'failures' => 0,
384                    'arvados_task' => $first_task,
385                  };
386   push @jobstep_todo, 0;
387 }
388
389
390 if (!$have_slurm)
391 {
392   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
393 }
394
395 my $build_script = handle_readall(\*DATA);
396 my $nodelist = join(",", @node);
397 my $git_tar_count = 0;
398
399 if (!defined $no_clear_tmp) {
400   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
401   # up work directories crunch_tmp/work, crunch_tmp/opt,
402   # crunch_tmp/src*.
403   #
404   # TODO: When #5036 is done and widely deployed, we can limit mount's
405   # -t option to simply fuse.keep.
406   my ($exited, $stdout, $stderr) = srun_sync(
407     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
408     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
409     {label => "clean work dirs"});
410   if ($exited != 0) {
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   Log (undef, "Install docker image $docker_locator");
419   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
420   if (!$docker_hash)
421   {
422     croak("No Docker image hash found from locator $docker_locator");
423   }
424   Log (undef, "docker image hash is $docker_hash");
425   $docker_stream =~ s/^\.//;
426   my $docker_install_script = qq{
427 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
428     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
429 fi
430 };
431
432   my ($exited, $stdout, $stderr) = srun_sync(
433     ["srun", "--nodelist=" . join(',', @node)],
434     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
435     {label => "load docker image"});
436   if ($exited != 0)
437   {
438     exit(EX_RETRY_UNLOCKED);
439   }
440
441   # Determine whether this version of Docker supports memory+swap limits.
442   ($exited, $stdout, $stderr) = srun_sync(
443     ["srun", "--nodes=1"],
444     [$docker_bin, 'run', '--help'],
445     {label => "check --memory-swap feature"});
446   $docker_limitmem = ($stdout =~ /--memory-swap/);
447
448   # Find a non-root Docker user to use.
449   # Tries the default user for the container, then 'crunch', then 'nobody',
450   # testing for whether the actual user id is non-zero.  This defends against
451   # mistakes but not malice, but we intend to harden the security in the future
452   # so we don't want anyone getting used to their jobs running as root in their
453   # Docker containers.
454   my @tryusers = ("", "crunch", "nobody");
455   foreach my $try_user (@tryusers) {
456     my $label;
457     my $try_user_arg;
458     if ($try_user eq "") {
459       $label = "check whether default user is UID 0";
460       $try_user_arg = "";
461     } else {
462       $label = "check whether user '$try_user' is UID 0";
463       $try_user_arg = "--user=$try_user";
464     }
465     my ($exited, $stdout, $stderr) = srun_sync(
466       ["srun", "--nodes=1"],
467       ["/bin/sh", "-ec",
468        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
469       {label => $label});
470     chomp($stdout);
471     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
472       $dockeruserarg = $try_user_arg;
473       if ($try_user eq "") {
474         Log(undef, "Container will run with default user");
475       } else {
476         Log(undef, "Container will run with $dockeruserarg");
477       }
478       last;
479     }
480   }
481
482   if (!defined $dockeruserarg) {
483     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
484   }
485
486   if ($Job->{arvados_sdk_version}) {
487     # The job also specifies an Arvados SDK version.  Add the SDKs to the
488     # tar file for the build script to install.
489     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
490                        $Job->{arvados_sdk_version}));
491     add_git_archive("git", "--git-dir=$git_dir", "archive",
492                     "--prefix=.arvados.sdk/",
493                     $Job->{arvados_sdk_version}, "sdk");
494   }
495 }
496
497 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
498   # If script_version looks like an absolute path, *and* the --git-dir
499   # argument was not given -- which implies we were not invoked by
500   # crunch-dispatch -- we will use the given path as a working
501   # directory instead of resolving script_version to a git commit (or
502   # doing anything else with git).
503   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
504   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
505 }
506 else {
507   # Resolve the given script_version to a git commit sha1. Also, if
508   # the repository is remote, clone it into our local filesystem: this
509   # ensures "git archive" will work, and is necessary to reliably
510   # resolve a symbolic script_version like "master^".
511   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
512
513   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
514
515   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
516
517   # If we're running under crunch-dispatch, it will have already
518   # pulled the appropriate source tree into its own repository, and
519   # given us that repo's path as $git_dir.
520   #
521   # If we're running a "local" job, we might have to fetch content
522   # from a remote repository.
523   #
524   # (Currently crunch-dispatch gives a local path with --git-dir, but
525   # we might as well accept URLs there too in case it changes its
526   # mind.)
527   my $repo = $git_dir || $Job->{'repository'};
528
529   # Repository can be remote or local. If remote, we'll need to fetch it
530   # to a local dir before doing `git log` et al.
531   my $repo_location;
532
533   if ($repo =~ m{://|^[^/]*:}) {
534     # $repo is a git url we can clone, like git:// or https:// or
535     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
536     # not recognized here because distinguishing that from a local
537     # path is too fragile. If you really need something strange here,
538     # use the ssh:// form.
539     $repo_location = 'remote';
540   } elsif ($repo =~ m{^\.*/}) {
541     # $repo is a local path to a git index. We'll also resolve ../foo
542     # to ../foo/.git if the latter is a directory. To help
543     # disambiguate local paths from named hosted repositories, this
544     # form must be given as ./ or ../ if it's a relative path.
545     if (-d "$repo/.git") {
546       $repo = "$repo/.git";
547     }
548     $repo_location = 'local';
549   } else {
550     # $repo is none of the above. It must be the name of a hosted
551     # repository.
552     my $arv_repo_list = api_call("repositories/list",
553                                  'filters' => [['name','=',$repo]]);
554     my @repos_found = @{$arv_repo_list->{'items'}};
555     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
556     if ($n_found > 0) {
557       Log(undef, "Repository '$repo' -> "
558           . join(", ", map { $_->{'uuid'} } @repos_found));
559     }
560     if ($n_found != 1) {
561       croak("Error: Found $n_found repositories with name '$repo'.");
562     }
563     $repo = $repos_found[0]->{'fetch_url'};
564     $repo_location = 'remote';
565   }
566   Log(undef, "Using $repo_location repository '$repo'");
567   $ENV{"CRUNCH_SRC_URL"} = $repo;
568
569   # Resolve given script_version (we'll call that $treeish here) to a
570   # commit sha1 ($commit).
571   my $treeish = $Job->{'script_version'};
572   my $commit;
573   if ($repo_location eq 'remote') {
574     # We minimize excess object-fetching by re-using the same bare
575     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
576     # just keep adding remotes to it as needed.
577     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
578     my $gitcmd = "git --git-dir=\Q$local_repo\E";
579
580     # Set up our local repo for caching remote objects, making
581     # archives, etc.
582     if (!-d $local_repo) {
583       make_path($local_repo) or croak("Error: could not create $local_repo");
584     }
585     # This works (exits 0 and doesn't delete fetched objects) even
586     # if $local_repo is already initialized:
587     `$gitcmd init --bare`;
588     if ($?) {
589       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
590     }
591
592     # If $treeish looks like a hash (or abbrev hash) we look it up in
593     # our local cache first, since that's cheaper. (We don't want to
594     # do that with tags/branches though -- those change over time, so
595     # they should always be resolved by the remote repo.)
596     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
597       # Hide stderr because it's normal for this to fail:
598       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
599       if ($? == 0 &&
600           # Careful not to resolve a branch named abcdeff to commit 1234567:
601           $sha1 =~ /^$treeish/ &&
602           $sha1 =~ /^([0-9a-f]{40})$/s) {
603         $commit = $1;
604         Log(undef, "Commit $commit already present in $local_repo");
605       }
606     }
607
608     if (!defined $commit) {
609       # If $treeish isn't just a hash or abbrev hash, or isn't here
610       # yet, we need to fetch the remote to resolve it correctly.
611
612       # First, remove all local heads. This prevents a name that does
613       # not exist on the remote from resolving to (or colliding with)
614       # a previously fetched branch or tag (possibly from a different
615       # remote).
616       remove_tree("$local_repo/refs/heads", {keep_root => 1});
617
618       Log(undef, "Fetching objects from $repo to $local_repo");
619       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
620       if ($?) {
621         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
622       }
623     }
624
625     # Now that the data is all here, we will use our local repo for
626     # the rest of our git activities.
627     $repo = $local_repo;
628   }
629
630   my $gitcmd = "git --git-dir=\Q$repo\E";
631   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
632   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
633     croak("`$gitcmd rev-list` exited "
634           .exit_status_s($?)
635           .", '$treeish' not found, giving up");
636   }
637   $commit = $1;
638   Log(undef, "Version $treeish is commit $commit");
639
640   if ($commit ne $Job->{'script_version'}) {
641     # Record the real commit id in the database, frozentokey, logs,
642     # etc. -- instead of an abbreviation or a branch name which can
643     # become ambiguous or point to a different commit in the future.
644     if (!$Job->update_attributes('script_version' => $commit)) {
645       croak("Error: failed to update job's script_version attribute");
646     }
647   }
648
649   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
650   add_git_archive("$gitcmd archive ''\Q$commit\E");
651 }
652
653 my $git_archive = combined_git_archive();
654 if (!defined $git_archive) {
655   Log(undef, "Skip install phase (no git archive)");
656   if ($have_slurm) {
657     Log(undef, "Warning: This probably means workers have no source tree!");
658   }
659 }
660 else {
661   my $exited;
662   my $install_script_tries_left = 3;
663   for (my $attempts = 0; $attempts < 3; $attempts++) {
664     my @srunargs = ("srun",
665                     "--nodelist=$nodelist",
666                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
667     my @execargs = ("sh", "-c",
668                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
669
670     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
671     my ($stdout, $stderr);
672     ($exited, $stdout, $stderr) = srun_sync(
673       \@srunargs, \@execargs,
674       {label => "run install script on all workers"},
675       $build_script . $git_archive);
676
677     my $stderr_anything_from_script = 0;
678     for my $line (split(/\n/, $stderr)) {
679       if ($line !~ /^(srun: error: |starting: \[)/) {
680         $stderr_anything_from_script = 1;
681       }
682     }
683
684     last if $exited == 0 || $main::please_freeze;
685
686     # If the install script fails but doesn't print an error message,
687     # the next thing anyone is likely to do is just run it again in
688     # case it was a transient problem like "slurm communication fails
689     # because the network isn't reliable enough". So we'll just do
690     # that ourselves (up to 3 attempts in total). OTOH, if there is an
691     # error message, the problem is more likely to have a real fix and
692     # we should fail the job so the fixing process can start, instead
693     # of doing 2 more attempts.
694     last if $stderr_anything_from_script;
695   }
696
697   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
698     unlink($tar_filename);
699   }
700
701   if ($exited != 0) {
702     croak("Giving up");
703   }
704 }
705
706 foreach (qw (script script_version script_parameters runtime_constraints))
707 {
708   Log (undef,
709        "$_ " .
710        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
711 }
712 foreach (split (/\n/, $Job->{knobs}))
713 {
714   Log (undef, "knob " . $_);
715 }
716
717
718
719 $main::success = undef;
720
721
722
723 ONELEVEL:
724
725 my $thisround_succeeded = 0;
726 my $thisround_failed = 0;
727 my $thisround_failed_multiple = 0;
728 my $working_slot_count = scalar(@slot);
729
730 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
731                        or $a <=> $b } @jobstep_todo;
732 my $level = $jobstep[$jobstep_todo[0]]->{level};
733
734 my $initial_tasks_this_level = 0;
735 foreach my $id (@jobstep_todo) {
736   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
737 }
738
739 # If the number of tasks scheduled at this level #T is smaller than the number
740 # of slots available #S, only use the first #T slots, or the first slot on
741 # each node, whichever number is greater.
742 #
743 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
744 # based on these numbers.  Using fewer slots makes more resources available
745 # to each individual task, which should normally be a better strategy when
746 # there are fewer of them running with less parallelism.
747 #
748 # Note that this calculation is not redone if the initial tasks at
749 # this level queue more tasks at the same level.  This may harm
750 # overall task throughput for that level.
751 my @freeslot;
752 if ($initial_tasks_this_level < @node) {
753   @freeslot = (0..$#node);
754 } elsif ($initial_tasks_this_level < @slot) {
755   @freeslot = (0..$initial_tasks_this_level - 1);
756 } else {
757   @freeslot = (0..$#slot);
758 }
759 my $round_num_freeslots = scalar(@freeslot);
760 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
761
762 my %round_max_slots = ();
763 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
764   my $this_slot = $slot[$freeslot[$ii]];
765   my $node_name = $this_slot->{node}->{name};
766   $round_max_slots{$node_name} ||= $this_slot->{cpu};
767   last if (scalar(keys(%round_max_slots)) >= @node);
768 }
769
770 Log(undef, "start level $level with $round_num_freeslots slots");
771 my @holdslot;
772 my %reader;
773 my $progress_is_dirty = 1;
774 my $progress_stats_updated = 0;
775
776 update_progress_stats();
777
778
779 THISROUND:
780 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
781 {
782   # Don't create new tasks if we already know the job's final result.
783   last if defined($main::success);
784
785   my $id = $jobstep_todo[$todo_ptr];
786   my $Jobstep = $jobstep[$id];
787   if ($Jobstep->{level} != $level)
788   {
789     next;
790   }
791
792   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
793   set_nonblocking($reader{$id});
794
795   my $childslot = $freeslot[0];
796   my $childnode = $slot[$childslot]->{node};
797   my $childslotname = join (".",
798                             $slot[$childslot]->{node}->{name},
799                             $slot[$childslot]->{cpu});
800
801   my $childpid = fork();
802   if ($childpid == 0)
803   {
804     $SIG{'INT'} = 'DEFAULT';
805     $SIG{'QUIT'} = 'DEFAULT';
806     $SIG{'TERM'} = 'DEFAULT';
807
808     foreach (values (%reader))
809     {
810       close($_);
811     }
812     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
813     open(STDOUT,">&writer");
814     open(STDERR,">&writer");
815
816     undef $dbh;
817     undef $sth;
818
819     delete $ENV{"GNUPGHOME"};
820     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
821     $ENV{"TASK_QSEQUENCE"} = $id;
822     $ENV{"TASK_SEQUENCE"} = $level;
823     $ENV{"JOB_SCRIPT"} = $Job->{script};
824     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
825       $param =~ tr/a-z/A-Z/;
826       $ENV{"JOB_PARAMETER_$param"} = $value;
827     }
828     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
829     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
830     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
831     $ENV{"HOME"} = $ENV{"TASK_WORK"};
832     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
833     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
834     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
835
836     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
837
838     $ENV{"GZIP"} = "-n";
839
840     my @srunargs = (
841       "srun",
842       "--nodelist=".$childnode->{name},
843       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
844       "--job-name=$job_id.$id.$$",
845         );
846
847     my $stdbuf = " stdbuf --output=0 --error=0 ";
848
849     my $arv_file_cache = "";
850     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
851       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
852     }
853
854     my $command =
855         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
856         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
857         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
858         # These environment variables get used explicitly later in
859         # $command.  No tool is expected to read these values directly.
860         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
861         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
862         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
863         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
864
865     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
866     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
867     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
868
869     if ($docker_hash)
870     {
871       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
872       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
873       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
874       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
875       # We only set memory limits if Docker lets us limit both memory and swap.
876       # Memory limits alone have been supported longer, but subprocesses tend
877       # to get SIGKILL if they exceed that without any swap limit set.
878       # See #5642 for additional background.
879       if ($docker_limitmem) {
880         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
881       }
882
883       # The source tree and $destdir directory (which we have
884       # installed on the worker host) are available in the container,
885       # under the same path.
886       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
887       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
888
889       # Currently, we make the "by_pdh" directory in arv-mount's mount
890       # point appear at /keep inside the container (instead of using
891       # the same path as the host like we do with CRUNCH_SRC and
892       # CRUNCH_INSTALL). However, crunch scripts and utilities must
893       # not rely on this. They must use $TASK_KEEPMOUNT.
894       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
895       $ENV{TASK_KEEPMOUNT} = "/keep";
896
897       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
898       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
899       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
900
901       # TASK_WORK is almost exactly like a docker data volume: it
902       # starts out empty, is writable, and persists until no
903       # containers use it any more. We don't use --volumes-from to
904       # share it with other containers: it is only accessible to this
905       # task, and it goes away when this task stops.
906       #
907       # However, a docker data volume is writable only by root unless
908       # the mount point already happens to exist in the container with
909       # different permissions. Therefore, we [1] assume /tmp already
910       # exists in the image and is writable by the crunch user; [2]
911       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
912       # writable if they are created by docker while setting up the
913       # other --volumes); and [3] create $TASK_WORK inside the
914       # container using $build_script.
915       $command .= "--volume=/tmp ";
916       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
917       $ENV{"HOME"} = $ENV{"TASK_WORK"};
918       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
919
920       # TODO: Share a single JOB_WORK volume across all task
921       # containers on a given worker node, and delete it when the job
922       # ends (and, in case that doesn't work, when the next job
923       # starts).
924       #
925       # For now, use the same approach as TASK_WORK above.
926       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
927
928       # Bind mount the crunchrunner binary and host TLS certificates file into
929       # the container.
930       $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/local/bin/crunchrunner\E ";
931       $command .= "--volume=\Q$ENV{HOST_CERTS}:/etc/arvados/ca-certificates.crt\E ";
932
933       while (my ($env_key, $env_val) = each %ENV)
934       {
935         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
936           $command .= "--env=\Q$env_key=$env_val\E ";
937         }
938       }
939       $command .= "--env=\QHOME=$ENV{HOME}\E ";
940       $command .= "\Q$docker_hash\E ";
941
942       if ($Job->{arvados_sdk_version}) {
943         $command .= $stdbuf;
944         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
945       } else {
946         $command .= "/bin/sh -c \'python -c " .
947             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
948             ">&2 2>/dev/null; " .
949             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
950             "if which stdbuf >/dev/null ; then " .
951             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
952             " else " .
953             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
954             " fi\'";
955       }
956     } else {
957       # Non-docker run
958       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
959       $command .= $stdbuf;
960       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
961     }
962
963     my @execargs = ('bash', '-c', $command);
964     srun (\@srunargs, \@execargs, undef, $build_script);
965     # exec() failed, we assume nothing happened.
966     die "srun() failed on build script\n";
967   }
968   close("writer");
969   if (!defined $childpid)
970   {
971     close $reader{$id};
972     delete $reader{$id};
973     next;
974   }
975   shift @freeslot;
976   $proc{$childpid} = {
977     jobstepidx => $id,
978     time => time,
979     slot => $childslot,
980     jobstepname => "$job_id.$id.$childpid",
981   };
982   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
983   $slot[$childslot]->{pid} = $childpid;
984
985   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
986   Log ($id, "child $childpid started on $childslotname");
987   $Jobstep->{starttime} = time;
988   $Jobstep->{node} = $childnode->{name};
989   $Jobstep->{slotindex} = $childslot;
990   delete $Jobstep->{stderr};
991   delete $Jobstep->{finishtime};
992   delete $Jobstep->{tempfail};
993
994   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
995   $Jobstep->{'arvados_task'}->save;
996
997   splice @jobstep_todo, $todo_ptr, 1;
998   --$todo_ptr;
999
1000   $progress_is_dirty = 1;
1001
1002   while (!@freeslot
1003          ||
1004          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1005   {
1006     last THISROUND if $main::please_freeze;
1007     if ($main::please_info)
1008     {
1009       $main::please_info = 0;
1010       freeze();
1011       create_output_collection();
1012       save_meta(1);
1013       update_progress_stats();
1014     }
1015     my $gotsome
1016         = readfrompipes ()
1017         + reapchildren ();
1018     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1019     {
1020       check_refresh_wanted();
1021       check_squeue();
1022       update_progress_stats();
1023     }
1024     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1025     {
1026       update_progress_stats();
1027     }
1028     if (!$gotsome) {
1029       select (undef, undef, undef, 0.1);
1030     }
1031     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1032                                         $_->{node}->{hold_count} < 4 } @slot);
1033     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1034         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1035     {
1036       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1037           .($thisround_failed+$thisround_succeeded)
1038           .") -- giving up on this round";
1039       Log (undef, $message);
1040       last THISROUND;
1041     }
1042
1043     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1044     for (my $i=$#freeslot; $i>=0; $i--) {
1045       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1046         push @holdslot, (splice @freeslot, $i, 1);
1047       }
1048     }
1049     for (my $i=$#holdslot; $i>=0; $i--) {
1050       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1051         push @freeslot, (splice @holdslot, $i, 1);
1052       }
1053     }
1054
1055     # give up if no nodes are succeeding
1056     if ($working_slot_count < 1) {
1057       Log(undef, "Every node has failed -- giving up");
1058       last THISROUND;
1059     }
1060   }
1061 }
1062
1063
1064 push @freeslot, splice @holdslot;
1065 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1066
1067
1068 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1069 while (%proc)
1070 {
1071   if ($main::please_continue) {
1072     $main::please_continue = 0;
1073     goto THISROUND;
1074   }
1075   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1076   readfrompipes ();
1077   if (!reapchildren())
1078   {
1079     check_refresh_wanted();
1080     check_squeue();
1081     update_progress_stats();
1082     select (undef, undef, undef, 0.1);
1083     killem (keys %proc) if $main::please_freeze;
1084   }
1085 }
1086
1087 update_progress_stats();
1088 freeze_if_want_freeze();
1089
1090
1091 if (!defined $main::success)
1092 {
1093   if (!@jobstep_todo) {
1094     $main::success = 1;
1095   } elsif ($working_slot_count < 1) {
1096     save_output_collection();
1097     save_meta();
1098     exit(EX_RETRY_UNLOCKED);
1099   } elsif ($thisround_succeeded == 0 &&
1100            ($thisround_failed == 0 || $thisround_failed > 4)) {
1101     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1102     Log (undef, $message);
1103     $main::success = 0;
1104   }
1105 }
1106
1107 goto ONELEVEL if !defined $main::success;
1108
1109
1110 release_allocation();
1111 freeze();
1112 my $collated_output = save_output_collection();
1113 Log (undef, "finish");
1114
1115 save_meta();
1116
1117 my $final_state;
1118 if ($collated_output && $main::success) {
1119   $final_state = 'Complete';
1120 } else {
1121   $final_state = 'Failed';
1122 }
1123 $Job->update_attributes('state' => $final_state);
1124
1125 exit (($final_state eq 'Complete') ? 0 : 1);
1126
1127
1128
1129 sub update_progress_stats
1130 {
1131   $progress_stats_updated = time;
1132   return if !$progress_is_dirty;
1133   my ($todo, $done, $running) = (scalar @jobstep_todo,
1134                                  scalar @jobstep_done,
1135                                  scalar keys(%proc));
1136   $Job->{'tasks_summary'} ||= {};
1137   $Job->{'tasks_summary'}->{'todo'} = $todo;
1138   $Job->{'tasks_summary'}->{'done'} = $done;
1139   $Job->{'tasks_summary'}->{'running'} = $running;
1140   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1141   Log (undef, "status: $done done, $running running, $todo todo");
1142   $progress_is_dirty = 0;
1143 }
1144
1145
1146
1147 sub reapchildren
1148 {
1149   my $children_reaped = 0;
1150   my @successful_task_uuids = ();
1151
1152   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1153   {
1154     my $childstatus = $?;
1155
1156     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1157                     . "."
1158                     . $slot[$proc{$pid}->{slot}]->{cpu});
1159     my $jobstepidx = $proc{$pid}->{jobstepidx};
1160
1161     if (!WIFEXITED($childstatus))
1162     {
1163       # child did not exit (may be temporarily stopped)
1164       Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
1165       next;
1166     }
1167
1168     $children_reaped++;
1169     my $elapsed = time - $proc{$pid}->{time};
1170     my $Jobstep = $jobstep[$jobstepidx];
1171
1172     my $exitvalue = $childstatus >> 8;
1173     my $exitinfo = "exit ".exit_status_s($childstatus);
1174     $Jobstep->{'arvados_task'}->reload;
1175     my $task_success = $Jobstep->{'arvados_task'}->{success};
1176
1177     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1178
1179     if (!defined $task_success) {
1180       # task did not indicate one way or the other --> fail
1181       Log($jobstepidx, sprintf(
1182             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1183             exit_status_s($childstatus)));
1184       $Jobstep->{'arvados_task'}->{success} = 0;
1185       $Jobstep->{'arvados_task'}->save;
1186       $task_success = 0;
1187     }
1188
1189     if (!$task_success)
1190     {
1191       my $temporary_fail;
1192       $temporary_fail ||= $Jobstep->{tempfail};
1193       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1194
1195       ++$thisround_failed;
1196       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1197
1198       # Check for signs of a failed or misconfigured node
1199       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1200           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1201         # Don't count this against jobstep failure thresholds if this
1202         # node is already suspected faulty and srun exited quickly
1203         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1204             $elapsed < 5) {
1205           Log ($jobstepidx, "blaming failure on suspect node " .
1206                $slot[$proc{$pid}->{slot}]->{node}->{name});
1207           $temporary_fail ||= 1;
1208         }
1209         ban_node_by_slot($proc{$pid}->{slot});
1210       }
1211
1212       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1213                                 ++$Jobstep->{'failures'},
1214                                 $temporary_fail ? 'temporary' : 'permanent',
1215                                 $elapsed));
1216
1217       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1218         # Give up on this task, and the whole job
1219         $main::success = 0;
1220       }
1221       # Put this task back on the todo queue
1222       push @jobstep_todo, $jobstepidx;
1223       $Job->{'tasks_summary'}->{'failed'}++;
1224     }
1225     else # task_success
1226     {
1227       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1228       ++$thisround_succeeded;
1229       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1230       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1231       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1232       push @jobstep_done, $jobstepidx;
1233       Log ($jobstepidx, "success in $elapsed seconds");
1234     }
1235     $Jobstep->{exitcode} = $childstatus;
1236     $Jobstep->{finishtime} = time;
1237     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1238     $Jobstep->{'arvados_task'}->save;
1239     process_stderr_final ($jobstepidx);
1240     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1241                               length($Jobstep->{'arvados_task'}->{output}),
1242                               $Jobstep->{'arvados_task'}->{output}));
1243
1244     close $reader{$jobstepidx};
1245     delete $reader{$jobstepidx};
1246     delete $slot[$proc{$pid}->{slot}]->{pid};
1247     push @freeslot, $proc{$pid}->{slot};
1248     delete $proc{$pid};
1249
1250     $progress_is_dirty = 1;
1251   }
1252
1253   if (scalar(@successful_task_uuids) > 0)
1254   {
1255     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1256     # Load new tasks
1257     my $newtask_list = [];
1258     my $newtask_results;
1259     do {
1260       $newtask_results = api_call(
1261         "job_tasks/list",
1262         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1263         'order' => 'qsequence',
1264         'offset' => scalar(@$newtask_list),
1265           );
1266       push(@$newtask_list, @{$newtask_results->{items}});
1267     } while (@{$newtask_results->{items}});
1268     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1269     foreach my $arvados_task (@$newtask_list) {
1270       my $jobstep = {
1271         'level' => $arvados_task->{'sequence'},
1272         'failures' => 0,
1273         'arvados_task' => $arvados_task
1274       };
1275       push @jobstep, $jobstep;
1276       push @jobstep_todo, $#jobstep;
1277     }
1278   }
1279
1280   return $children_reaped;
1281 }
1282
1283 sub check_refresh_wanted
1284 {
1285   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1286   if (@stat &&
1287       $stat[9] > $latest_refresh &&
1288       # ...and we have actually locked the job record...
1289       $job_id eq $Job->{'uuid'}) {
1290     $latest_refresh = scalar time;
1291     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1292     for my $attr ('cancelled_at',
1293                   'cancelled_by_user_uuid',
1294                   'cancelled_by_client_uuid',
1295                   'state') {
1296       $Job->{$attr} = $Job2->{$attr};
1297     }
1298     if ($Job->{'state'} ne "Running") {
1299       if ($Job->{'state'} eq "Cancelled") {
1300         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1301       } else {
1302         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1303       }
1304       $main::success = 0;
1305       $main::please_freeze = 1;
1306     }
1307   }
1308 }
1309
1310 sub check_squeue
1311 {
1312   my $last_squeue_check = $squeue_checked;
1313
1314   # Do not call `squeue` or check the kill list more than once every
1315   # 15 seconds.
1316   return if $last_squeue_check > time - 15;
1317   $squeue_checked = time;
1318
1319   # Look for children from which we haven't received stderr data since
1320   # the last squeue check. If no such children exist, all procs are
1321   # alive and there's no need to even look at squeue.
1322   #
1323   # As long as the crunchstat poll interval (10s) is shorter than the
1324   # squeue check interval (15s) this should make the squeue check an
1325   # infrequent event.
1326   my $silent_procs = 0;
1327   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1328   {
1329     if (!exists($js->{stderr_at}))
1330     {
1331       $js->{stderr_at} = 0;
1332     }
1333     if ($js->{stderr_at} < $last_squeue_check)
1334     {
1335       $silent_procs++;
1336     }
1337   }
1338   return if $silent_procs == 0;
1339
1340   # use killem() on procs whose killtime is reached
1341   while (my ($pid, $procinfo) = each %proc)
1342   {
1343     my $js = $jobstep[$procinfo->{jobstepidx}];
1344     if (exists $procinfo->{killtime}
1345         && $procinfo->{killtime} <= time
1346         && $js->{stderr_at} < $last_squeue_check)
1347     {
1348       my $sincewhen = "";
1349       if ($js->{stderr_at}) {
1350         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1351       }
1352       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1353       killem ($pid);
1354     }
1355   }
1356
1357   if (!$have_slurm)
1358   {
1359     # here is an opportunity to check for mysterious problems with local procs
1360     return;
1361   }
1362
1363   # Get a list of steps still running.  Note: squeue(1) says --steps
1364   # selects a format (which we override anyway) and allows us to
1365   # specify which steps we're interested in (which we don't).
1366   # Importantly, it also changes the meaning of %j from "job name" to
1367   # "step name" and (although this isn't mentioned explicitly in the
1368   # docs) switches from "one line per job" mode to "one line per step"
1369   # mode. Without it, we'd just get a list of one job, instead of a
1370   # list of N steps.
1371   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1372   if ($? != 0)
1373   {
1374     Log(undef, "warning: squeue exit status $? ($!)");
1375     return;
1376   }
1377   chop @squeue;
1378
1379   # which of my jobsteps are running, according to squeue?
1380   my %ok;
1381   for my $jobstepname (@squeue)
1382   {
1383     $ok{$jobstepname} = 1;
1384   }
1385
1386   # Check for child procs >60s old and not mentioned by squeue.
1387   while (my ($pid, $procinfo) = each %proc)
1388   {
1389     if ($procinfo->{time} < time - 60
1390         && $procinfo->{jobstepname}
1391         && !exists $ok{$procinfo->{jobstepname}}
1392         && !exists $procinfo->{killtime})
1393     {
1394       # According to slurm, this task has ended (successfully or not)
1395       # -- but our srun child hasn't exited. First we must wait (30
1396       # seconds) in case this is just a race between communication
1397       # channels. Then, if our srun child process still hasn't
1398       # terminated, we'll conclude some slurm communication
1399       # error/delay has caused the task to die without notifying srun,
1400       # and we'll kill srun ourselves.
1401       $procinfo->{killtime} = time + 30;
1402       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1403     }
1404   }
1405 }
1406
1407
1408 sub release_allocation
1409 {
1410   if ($have_slurm)
1411   {
1412     Log (undef, "release job allocation");
1413     system "scancel $ENV{SLURM_JOB_ID}";
1414   }
1415 }
1416
1417
1418 sub readfrompipes
1419 {
1420   my $gotsome = 0;
1421   my %fd_job;
1422   my $sel = IO::Select->new();
1423   foreach my $jobstepidx (keys %reader)
1424   {
1425     my $fd = $reader{$jobstepidx};
1426     $sel->add($fd);
1427     $fd_job{$fd} = $jobstepidx;
1428
1429     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1430       $sel->add($stdout_fd);
1431       $fd_job{$stdout_fd} = $jobstepidx;
1432     }
1433   }
1434   # select on all reader fds with 0.1s timeout
1435   my @ready_fds = $sel->can_read(0.1);
1436   foreach my $fd (@ready_fds)
1437   {
1438     my $buf;
1439     if (0 < sysread ($fd, $buf, 65536))
1440     {
1441       $gotsome = 1;
1442       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1443
1444       my $jobstepidx = $fd_job{$fd};
1445       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1446         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1447         next;
1448       }
1449
1450       $jobstep[$jobstepidx]->{stderr_at} = time;
1451       $jobstep[$jobstepidx]->{stderr} .= $buf;
1452
1453       # Consume everything up to the last \n
1454       preprocess_stderr ($jobstepidx);
1455
1456       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1457       {
1458         # If we get a lot of stderr without a newline, chop off the
1459         # front to avoid letting our buffer grow indefinitely.
1460         substr ($jobstep[$jobstepidx]->{stderr},
1461                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1462       }
1463     }
1464   }
1465   return $gotsome;
1466 }
1467
1468
1469 # Consume all full lines of stderr for a jobstep. Everything after the
1470 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1471 # returning.
1472 sub preprocess_stderr
1473 {
1474   my $jobstepidx = shift;
1475
1476   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1477     my $line = $1;
1478     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1479     Log ($jobstepidx, "stderr $line");
1480     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1481       # whoa.
1482       $main::please_freeze = 1;
1483     }
1484     elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1485       # Skip the following tempfail checks if this srun proc isn't
1486       # attached to a particular worker slot.
1487     }
1488     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1489       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1490       $slot[$job_slot_index]->{node}->{fail_count}++;
1491       $jobstep[$jobstepidx]->{tempfail} = 1;
1492       ban_node_by_slot($job_slot_index);
1493     }
1494     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1495       $jobstep[$jobstepidx]->{tempfail} = 1;
1496       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1497     }
1498     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1499       $jobstep[$jobstepidx]->{tempfail} = 1;
1500     }
1501   }
1502 }
1503
1504
1505 sub process_stderr_final
1506 {
1507   my $jobstepidx = shift;
1508   preprocess_stderr ($jobstepidx);
1509
1510   map {
1511     Log ($jobstepidx, "stderr $_");
1512   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1513   $jobstep[$jobstepidx]->{stderr} = '';
1514 }
1515
1516 sub fetch_block
1517 {
1518   my $hash = shift;
1519   my $keep;
1520   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1521     Log(undef, "fetch_block run error from arv-get $hash: $!");
1522     return undef;
1523   }
1524   my $output_block = "";
1525   while (1) {
1526     my $buf;
1527     my $bytes = sysread($keep, $buf, 1024 * 1024);
1528     if (!defined $bytes) {
1529       Log(undef, "fetch_block read error from arv-get: $!");
1530       $output_block = undef;
1531       last;
1532     } elsif ($bytes == 0) {
1533       # sysread returns 0 at the end of the pipe.
1534       last;
1535     } else {
1536       # some bytes were read into buf.
1537       $output_block .= $buf;
1538     }
1539   }
1540   close $keep;
1541   if ($?) {
1542     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1543     $output_block = undef;
1544   }
1545   return $output_block;
1546 }
1547
1548 # Create a collection by concatenating the output of all tasks (each
1549 # task's output is either a manifest fragment, a locator for a
1550 # manifest fragment stored in Keep, or nothing at all). Return the
1551 # portable_data_hash of the new collection.
1552 sub create_output_collection
1553 {
1554   Log (undef, "collate");
1555
1556   my ($child_out, $child_in);
1557   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1558 import arvados
1559 import sys
1560 print (arvados.api("v1").collections().
1561        create(body={"manifest_text": sys.stdin.read()}).
1562        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1563 }, retry_count());
1564
1565   my $task_idx = -1;
1566   my $manifest_size = 0;
1567   for (@jobstep)
1568   {
1569     ++$task_idx;
1570     my $output = $_->{'arvados_task'}->{output};
1571     next if (!defined($output));
1572     my $next_write;
1573     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1574       $next_write = fetch_block($output);
1575     } else {
1576       $next_write = $output;
1577     }
1578     if (defined($next_write)) {
1579       if (!defined(syswrite($child_in, $next_write))) {
1580         # There's been an error writing.  Stop the loop.
1581         # We'll log details about the exit code later.
1582         last;
1583       } else {
1584         $manifest_size += length($next_write);
1585       }
1586     } else {
1587       my $uuid = $_->{'arvados_task'}->{'uuid'};
1588       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1589       $main::success = 0;
1590     }
1591   }
1592   close($child_in);
1593   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1594
1595   my $joboutput;
1596   my $s = IO::Select->new($child_out);
1597   if ($s->can_read(120)) {
1598     sysread($child_out, $joboutput, 1024 * 1024);
1599     waitpid($pid, 0);
1600     if ($?) {
1601       Log(undef, "output collection creation exited " . exit_status_s($?));
1602       $joboutput = undef;
1603     } else {
1604       chomp($joboutput);
1605     }
1606   } else {
1607     Log (undef, "timed out while creating output collection");
1608     foreach my $signal (2, 2, 2, 15, 15, 9) {
1609       kill($signal, $pid);
1610       last if waitpid($pid, WNOHANG) == -1;
1611       sleep(1);
1612     }
1613   }
1614   close($child_out);
1615
1616   return $joboutput;
1617 }
1618
1619 # Calls create_output_collection, logs the result, and returns it.
1620 # If that was successful, save that as the output in the job record.
1621 sub save_output_collection {
1622   my $collated_output = create_output_collection();
1623
1624   if (!$collated_output) {
1625     Log(undef, "Failed to write output collection");
1626   }
1627   else {
1628     Log(undef, "job output $collated_output");
1629     $Job->update_attributes('output' => $collated_output);
1630   }
1631   return $collated_output;
1632 }
1633
1634 sub killem
1635 {
1636   foreach (@_)
1637   {
1638     my $sig = 2;                # SIGINT first
1639     if (exists $proc{$_}->{"sent_$sig"} &&
1640         time - $proc{$_}->{"sent_$sig"} > 4)
1641     {
1642       $sig = 15;                # SIGTERM if SIGINT doesn't work
1643     }
1644     if (exists $proc{$_}->{"sent_$sig"} &&
1645         time - $proc{$_}->{"sent_$sig"} > 4)
1646     {
1647       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1648     }
1649     if (!exists $proc{$_}->{"sent_$sig"})
1650     {
1651       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1652       kill $sig, $_;
1653       select (undef, undef, undef, 0.1);
1654       if ($sig == 2)
1655       {
1656         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1657       }
1658       $proc{$_}->{"sent_$sig"} = time;
1659       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1660     }
1661   }
1662 }
1663
1664
1665 sub fhbits
1666 {
1667   my($bits);
1668   for (@_) {
1669     vec($bits,fileno($_),1) = 1;
1670   }
1671   $bits;
1672 }
1673
1674
1675 # Send log output to Keep via arv-put.
1676 #
1677 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1678 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1679 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1680 # $log_pipe_pid is the pid of the arv-put subprocess.
1681 #
1682 # The only functions that should access these variables directly are:
1683 #
1684 # log_writer_start($logfilename)
1685 #     Starts an arv-put pipe, reading data on stdin and writing it to
1686 #     a $logfilename file in an output collection.
1687 #
1688 # log_writer_read_output([$timeout])
1689 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1690 #     Passes $timeout to the select() call, with a default of 0.01.
1691 #     Returns the result of the last read() call on $log_pipe_out, or
1692 #     -1 if read() wasn't called because select() timed out.
1693 #     Only other log_writer_* functions should need to call this.
1694 #
1695 # log_writer_send($txt)
1696 #     Writes $txt to the output log collection.
1697 #
1698 # log_writer_finish()
1699 #     Closes the arv-put pipe and returns the output that it produces.
1700 #
1701 # log_writer_is_active()
1702 #     Returns a true value if there is currently a live arv-put
1703 #     process, false otherwise.
1704 #
1705 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1706     $log_pipe_pid);
1707
1708 sub log_writer_start($)
1709 {
1710   my $logfilename = shift;
1711   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1712                         'arv-put',
1713                         '--stream',
1714                         '--retries', '3',
1715                         '--filename', $logfilename,
1716                         '-');
1717   $log_pipe_out_buf = "";
1718   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1719 }
1720
1721 sub log_writer_read_output {
1722   my $timeout = shift || 0.01;
1723   my $read = -1;
1724   while ($read && $log_pipe_out_select->can_read($timeout)) {
1725     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1726                  length($log_pipe_out_buf));
1727   }
1728   if (!defined($read)) {
1729     Log(undef, "error reading log manifest from arv-put: $!");
1730   }
1731   return $read;
1732 }
1733
1734 sub log_writer_send($)
1735 {
1736   my $txt = shift;
1737   print $log_pipe_in $txt;
1738   log_writer_read_output();
1739 }
1740
1741 sub log_writer_finish()
1742 {
1743   return unless $log_pipe_pid;
1744
1745   close($log_pipe_in);
1746
1747   my $logger_failed = 0;
1748   my $read_result = log_writer_read_output(120);
1749   if ($read_result == -1) {
1750     $logger_failed = -1;
1751     Log (undef, "timed out reading from 'arv-put'");
1752   } elsif ($read_result != 0) {
1753     $logger_failed = -2;
1754     Log(undef, "failed to read arv-put log manifest to EOF");
1755   }
1756
1757   waitpid($log_pipe_pid, 0);
1758   if ($?) {
1759     $logger_failed ||= $?;
1760     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1761   }
1762
1763   close($log_pipe_out);
1764   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1765   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1766       $log_pipe_out_select = undef;
1767
1768   return $arv_put_output;
1769 }
1770
1771 sub log_writer_is_active() {
1772   return $log_pipe_pid;
1773 }
1774
1775 sub Log                         # ($jobstepidx, $logmessage)
1776 {
1777   my ($jobstepidx, $logmessage) = @_;
1778   if ($logmessage =~ /\n/) {
1779     for my $line (split (/\n/, $_[1])) {
1780       Log ($jobstepidx, $line);
1781     }
1782     return;
1783   }
1784   my $fh = select STDERR; $|=1; select $fh;
1785   my $task_qseq = '';
1786   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1787     $task_qseq = $jobstepidx;
1788   }
1789   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1790   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1791   $message .= "\n";
1792   my $datetime;
1793   if (log_writer_is_active() || -t STDERR) {
1794     my @gmtime = gmtime;
1795     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1796                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1797   }
1798   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1799
1800   if (log_writer_is_active()) {
1801     log_writer_send($datetime . " " . $message);
1802   }
1803 }
1804
1805
1806 sub croak
1807 {
1808   my ($package, $file, $line) = caller;
1809   my $message = "@_ at $file line $line\n";
1810   Log (undef, $message);
1811   freeze() if @jobstep_todo;
1812   create_output_collection() if @jobstep_todo;
1813   cleanup();
1814   save_meta();
1815   die;
1816 }
1817
1818
1819 sub cleanup
1820 {
1821   return unless $Job;
1822   if ($Job->{'state'} eq 'Cancelled') {
1823     $Job->update_attributes('finished_at' => scalar gmtime);
1824   } else {
1825     $Job->update_attributes('state' => 'Failed');
1826   }
1827 }
1828
1829
1830 sub save_meta
1831 {
1832   my $justcheckpoint = shift; # false if this will be the last meta saved
1833   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1834   return unless log_writer_is_active();
1835   my $log_manifest = log_writer_finish();
1836   return unless defined($log_manifest);
1837
1838   if ($Job->{log}) {
1839     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1840     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1841   }
1842
1843   my $log_coll = api_call(
1844     "collections/create", ensure_unique_name => 1, collection => {
1845       manifest_text => $log_manifest,
1846       owner_uuid => $Job->{owner_uuid},
1847       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1848     });
1849   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1850   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1851 }
1852
1853
1854 sub freeze_if_want_freeze
1855 {
1856   if ($main::please_freeze)
1857   {
1858     release_allocation();
1859     if (@_)
1860     {
1861       # kill some srun procs before freeze+stop
1862       map { $proc{$_} = {} } @_;
1863       while (%proc)
1864       {
1865         killem (keys %proc);
1866         select (undef, undef, undef, 0.1);
1867         my $died;
1868         while (($died = waitpid (-1, WNOHANG)) > 0)
1869         {
1870           delete $proc{$died};
1871         }
1872       }
1873     }
1874     freeze();
1875     create_output_collection();
1876     cleanup();
1877     save_meta();
1878     exit 1;
1879   }
1880 }
1881
1882
1883 sub freeze
1884 {
1885   Log (undef, "Freeze not implemented");
1886   return;
1887 }
1888
1889
1890 sub thaw
1891 {
1892   croak ("Thaw not implemented");
1893 }
1894
1895
1896 sub freezequote
1897 {
1898   my $s = shift;
1899   $s =~ s/\\/\\\\/g;
1900   $s =~ s/\n/\\n/g;
1901   return $s;
1902 }
1903
1904
1905 sub freezeunquote
1906 {
1907   my $s = shift;
1908   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1909   return $s;
1910 }
1911
1912
1913 sub srun_sync
1914 {
1915   my $srunargs = shift;
1916   my $execargs = shift;
1917   my $opts = shift || {};
1918   my $stdin = shift;
1919
1920   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1921   Log (undef, "$label: start");
1922
1923   my ($stderr_r, $stderr_w);
1924   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1925
1926   my ($stdout_r, $stdout_w);
1927   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1928
1929   my $srunpid = fork();
1930   if ($srunpid == 0)
1931   {
1932     close($stderr_r);
1933     close($stdout_r);
1934     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1935     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1936     open(STDERR, ">&", $stderr_w);
1937     open(STDOUT, ">&", $stdout_w);
1938     srun ($srunargs, $execargs, $opts, $stdin);
1939     exit (1);
1940   }
1941   close($stderr_w);
1942   close($stdout_w);
1943
1944   set_nonblocking($stderr_r);
1945   set_nonblocking($stdout_r);
1946
1947   # Add entries to @jobstep and %proc so check_squeue() and
1948   # freeze_if_want_freeze() can treat it like a job task process.
1949   push @jobstep, {
1950     stderr => '',
1951     stderr_at => 0,
1952     stderr_captured => '',
1953     stdout_r => $stdout_r,
1954     stdout_captured => '',
1955   };
1956   my $jobstepidx = $#jobstep;
1957   $proc{$srunpid} = {
1958     jobstepidx => $jobstepidx,
1959   };
1960   $reader{$jobstepidx} = $stderr_r;
1961
1962   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1963     my $busy = readfrompipes();
1964     if (!$busy || ($latest_refresh + 2 < scalar time)) {
1965       check_refresh_wanted();
1966       check_squeue();
1967     }
1968     if (!$busy) {
1969       select(undef, undef, undef, 0.1);
1970     }
1971     killem(keys %proc) if $main::please_freeze;
1972   }
1973   my $exited = $?;
1974
1975   1 while readfrompipes();
1976   process_stderr_final ($jobstepidx);
1977
1978   Log (undef, "$label: exit ".exit_status_s($exited));
1979
1980   close($stdout_r);
1981   close($stderr_r);
1982   delete $proc{$srunpid};
1983   delete $reader{$jobstepidx};
1984
1985   my $j = pop @jobstep;
1986   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1987 }
1988
1989
1990 sub srun
1991 {
1992   my $srunargs = shift;
1993   my $execargs = shift;
1994   my $opts = shift || {};
1995   my $stdin = shift;
1996   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1997
1998   $Data::Dumper::Terse = 1;
1999   $Data::Dumper::Indent = 0;
2000   my $show_cmd = Dumper($args);
2001   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2002   $show_cmd =~ s/\n/ /g;
2003   if ($opts->{fork}) {
2004     Log(undef, "starting: $show_cmd");
2005   } else {
2006     # This is a child process: parent is in charge of reading our
2007     # stderr and copying it to Log() if needed.
2008     warn "starting: $show_cmd\n";
2009   }
2010
2011   if (defined $stdin) {
2012     my $child = open STDIN, "-|";
2013     defined $child or die "no fork: $!";
2014     if ($child == 0) {
2015       print $stdin or die $!;
2016       close STDOUT or die $!;
2017       exit 0;
2018     }
2019   }
2020
2021   return system (@$args) if $opts->{fork};
2022
2023   exec @$args;
2024   warn "ENV size is ".length(join(" ",%ENV));
2025   die "exec failed: $!: @$args";
2026 }
2027
2028
2029 sub ban_node_by_slot {
2030   # Don't start any new jobsteps on this node for 60 seconds
2031   my $slotid = shift;
2032   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2033   $slot[$slotid]->{node}->{hold_count}++;
2034   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2035 }
2036
2037 sub must_lock_now
2038 {
2039   my ($lockfile, $error_message) = @_;
2040   open L, ">", $lockfile or croak("$lockfile: $!");
2041   if (!flock L, LOCK_EX|LOCK_NB) {
2042     croak("Can't lock $lockfile: $error_message\n");
2043   }
2044 }
2045
2046 sub find_docker_image {
2047   # Given a Keep locator, check to see if it contains a Docker image.
2048   # If so, return its stream name and Docker hash.
2049   # If not, return undef for both values.
2050   my $locator = shift;
2051   my ($streamname, $filename);
2052   my $image = api_call("collections/get", uuid => $locator);
2053   if ($image) {
2054     foreach my $line (split(/\n/, $image->{manifest_text})) {
2055       my @tokens = split(/\s+/, $line);
2056       next if (!@tokens);
2057       $streamname = shift(@tokens);
2058       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2059         if (defined($filename)) {
2060           return (undef, undef);  # More than one file in the Collection.
2061         } else {
2062           $filename = (split(/:/, $filedata, 3))[2];
2063         }
2064       }
2065     }
2066   }
2067   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2068     return ($streamname, $1);
2069   } else {
2070     return (undef, undef);
2071   }
2072 }
2073
2074 sub retry_count {
2075   # Calculate the number of times an operation should be retried,
2076   # assuming exponential backoff, and that we're willing to retry as
2077   # long as tasks have been running.  Enforce a minimum of 3 retries.
2078   my ($starttime, $endtime, $timediff, $retries);
2079   if (@jobstep) {
2080     $starttime = $jobstep[0]->{starttime};
2081     $endtime = $jobstep[-1]->{finishtime};
2082   }
2083   if (!defined($starttime)) {
2084     $timediff = 0;
2085   } elsif (!defined($endtime)) {
2086     $timediff = time - $starttime;
2087   } else {
2088     $timediff = ($endtime - $starttime) - (time - $endtime);
2089   }
2090   if ($timediff > 0) {
2091     $retries = int(log($timediff) / log(2));
2092   } else {
2093     $retries = 1;  # Use the minimum.
2094   }
2095   return ($retries > 3) ? $retries : 3;
2096 }
2097
2098 sub retry_op {
2099   # Pass in two function references.
2100   # This method will be called with the remaining arguments.
2101   # If it dies, retry it with exponential backoff until it succeeds,
2102   # or until the current retry_count is exhausted.  After each failure
2103   # that can be retried, the second function will be called with
2104   # the current try count (0-based), next try time, and error message.
2105   my $operation = shift;
2106   my $retry_callback = shift;
2107   my $retries = retry_count();
2108   foreach my $try_count (0..$retries) {
2109     my $next_try = time + (2 ** $try_count);
2110     my $result = eval { $operation->(@_); };
2111     if (!$@) {
2112       return $result;
2113     } elsif ($try_count < $retries) {
2114       $retry_callback->($try_count, $next_try, $@);
2115       my $sleep_time = $next_try - time;
2116       sleep($sleep_time) if ($sleep_time > 0);
2117     }
2118   }
2119   # Ensure the error message ends in a newline, so Perl doesn't add
2120   # retry_op's line number to it.
2121   chomp($@);
2122   die($@ . "\n");
2123 }
2124
2125 sub api_call {
2126   # Pass in a /-separated API method name, and arguments for it.
2127   # This function will call that method, retrying as needed until
2128   # the current retry_count is exhausted, with a log on the first failure.
2129   my $method_name = shift;
2130   my $log_api_retry = sub {
2131     my ($try_count, $next_try_at, $errmsg) = @_;
2132     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2133     $errmsg =~ s/\s/ /g;
2134     $errmsg =~ s/\s+$//;
2135     my $retry_msg;
2136     if ($next_try_at < time) {
2137       $retry_msg = "Retrying.";
2138     } else {
2139       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2140       $retry_msg = "Retrying at $next_try_fmt.";
2141     }
2142     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2143   };
2144   my $method = $arv;
2145   foreach my $key (split(/\//, $method_name)) {
2146     $method = $method->{$key};
2147   }
2148   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2149 }
2150
2151 sub exit_status_s {
2152   # Given a $?, return a human-readable exit code string like "0" or
2153   # "1" or "0 with signal 1" or "1 with signal 11".
2154   my $exitcode = shift;
2155   my $s = $exitcode >> 8;
2156   if ($exitcode & 0x7f) {
2157     $s .= " with signal " . ($exitcode & 0x7f);
2158   }
2159   if ($exitcode & 0x80) {
2160     $s .= " with core dump";
2161   }
2162   return $s;
2163 }
2164
2165 sub handle_readall {
2166   # Pass in a glob reference to a file handle.
2167   # Read all its contents and return them as a string.
2168   my $fh_glob_ref = shift;
2169   local $/ = undef;
2170   return <$fh_glob_ref>;
2171 }
2172
2173 sub tar_filename_n {
2174   my $n = shift;
2175   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2176 }
2177
2178 sub add_git_archive {
2179   # Pass in a git archive command as a string or list, a la system().
2180   # This method will save its output to be included in the archive sent to the
2181   # build script.
2182   my $git_input;
2183   $git_tar_count++;
2184   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2185     croak("Failed to save git archive: $!");
2186   }
2187   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2188   close($git_input);
2189   waitpid($git_pid, 0);
2190   close(GIT_ARCHIVE);
2191   if ($?) {
2192     croak("Failed to save git archive: git exited " . exit_status_s($?));
2193   }
2194 }
2195
2196 sub combined_git_archive {
2197   # Combine all saved tar archives into a single archive, then return its
2198   # contents in a string.  Return undef if no archives have been saved.
2199   if ($git_tar_count < 1) {
2200     return undef;
2201   }
2202   my $base_tar_name = tar_filename_n(1);
2203   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2204     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2205     if ($tar_exit != 0) {
2206       croak("Error preparing build archive: tar -A exited " .
2207             exit_status_s($tar_exit));
2208     }
2209   }
2210   if (!open(GIT_TAR, "<", $base_tar_name)) {
2211     croak("Could not open build archive: $!");
2212   }
2213   my $tar_contents = handle_readall(\*GIT_TAR);
2214   close(GIT_TAR);
2215   return $tar_contents;
2216 }
2217
2218 sub set_nonblocking {
2219   my $fh = shift;
2220   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2221   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2222 }
2223
2224 __DATA__
2225 #!/usr/bin/env perl
2226 #
2227 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2228 # server invokes this script on individual compute nodes, or localhost if we're
2229 # running a job locally.  It gets called in two modes:
2230 #
2231 # * No arguments: Installation mode.  Read a tar archive from the DATA
2232 #   file handle; it includes the Crunch script's source code, and
2233 #   maybe SDKs as well.  Those should be installed in the proper
2234 #   locations.  This runs outside of any Docker container, so don't try to
2235 #   introspect Crunch's runtime environment.
2236 #
2237 # * With arguments: Crunch script run mode.  This script should set up the
2238 #   environment, then run the command specified in the arguments.  This runs
2239 #   inside any Docker container.
2240
2241 use Fcntl ':flock';
2242 use File::Path qw( make_path remove_tree );
2243 use POSIX qw(getcwd);
2244
2245 use constant TASK_TEMPFAIL => 111;
2246
2247 # Map SDK subdirectories to the path environments they belong to.
2248 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2249
2250 my $destdir = $ENV{"CRUNCH_SRC"};
2251 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2252 my $repo = $ENV{"CRUNCH_SRC_URL"};
2253 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2254 my $job_work = $ENV{"JOB_WORK"};
2255 my $task_work = $ENV{"TASK_WORK"};
2256
2257 open(STDOUT_ORIG, ">&", STDOUT);
2258 open(STDERR_ORIG, ">&", STDERR);
2259
2260 for my $dir ($destdir, $job_work, $task_work) {
2261   if ($dir) {
2262     make_path $dir;
2263     -e $dir or die "Failed to create temporary directory ($dir): $!";
2264   }
2265 }
2266
2267 if ($task_work) {
2268   remove_tree($task_work, {keep_root => 1});
2269 }
2270
2271 ### Crunch script run mode
2272 if (@ARGV) {
2273   # We want to do routine logging during task 0 only.  This gives the user
2274   # the information they need, but avoids repeating the information for every
2275   # task.
2276   my $Log;
2277   if ($ENV{TASK_SEQUENCE} eq "0") {
2278     $Log = sub {
2279       my $msg = shift;
2280       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2281     };
2282   } else {
2283     $Log = sub { };
2284   }
2285
2286   my $python_src = "$install_dir/python";
2287   my $venv_dir = "$job_work/.arvados.venv";
2288   my $venv_built = -e "$venv_dir/bin/activate";
2289   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2290     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2291                  "--python=python2.7", $venv_dir);
2292     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2293     $venv_built = 1;
2294     $Log->("Built Python SDK virtualenv");
2295   }
2296
2297   my @pysdk_version_cmd = ("python", "-c",
2298     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2299   if ($venv_built) {
2300     $Log->("Running in Python SDK virtualenv");
2301     @pysdk_version_cmd = ();
2302     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2303     @ARGV = ("/bin/sh", "-ec",
2304              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2305   } elsif (-d $python_src) {
2306     $Log->("Warning: virtualenv not found inside Docker container default " .
2307            "\$PATH. Can't install Python SDK.");
2308   }
2309
2310   if (@pysdk_version_cmd) {
2311     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2312     my $pysdk_version = <$pysdk_version_pipe>;
2313     close($pysdk_version_pipe);
2314     if ($? == 0) {
2315       chomp($pysdk_version);
2316       $Log->("Using Arvados SDK version $pysdk_version");
2317     } else {
2318       # A lot could've gone wrong here, but pretty much all of it means that
2319       # Python won't be able to load the Arvados SDK.
2320       $Log->("Warning: Arvados SDK not found");
2321     }
2322   }
2323
2324   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2325     my $sdk_path = "$install_dir/$sdk_dir";
2326     if (-d $sdk_path) {
2327       if ($ENV{$sdk_envkey}) {
2328         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2329       } else {
2330         $ENV{$sdk_envkey} = $sdk_path;
2331       }
2332       $Log->("Arvados SDK added to %s", $sdk_envkey);
2333     }
2334   }
2335
2336   exec(@ARGV);
2337   die "Cannot exec `@ARGV`: $!";
2338 }
2339
2340 ### Installation mode
2341 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2342 flock L, LOCK_EX;
2343 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2344   # This exact git archive (source + arvados sdk) is already installed
2345   # here, so there's no need to reinstall it.
2346
2347   # We must consume our DATA section, though: otherwise the process
2348   # feeding it to us will get SIGPIPE.
2349   my $buf;
2350   while (read(DATA, $buf, 65536)) { }
2351
2352   exit(0);
2353 }
2354
2355 unlink "$destdir.archive_hash";
2356 mkdir $destdir;
2357
2358 do {
2359   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2360   local $SIG{PIPE} = "IGNORE";
2361   warn "Extracting archive: $archive_hash\n";
2362   # --ignore-zeros is necessary sometimes: depending on how much NUL
2363   # padding tar -A put on our combined archive (which in turn depends
2364   # on the length of the component archives) tar without
2365   # --ignore-zeros will exit before consuming stdin and cause close()
2366   # to fail on the resulting SIGPIPE.
2367   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2368     die "Error launching 'tar -xC $destdir': $!";
2369   }
2370   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2371   # get SIGPIPE.  We must feed it data incrementally.
2372   my $tar_input;
2373   while (read(DATA, $tar_input, 65536)) {
2374     print TARX $tar_input;
2375   }
2376   if(!close(TARX)) {
2377     die "'tar -xC $destdir' exited $?: $!";
2378   }
2379 };
2380
2381 mkdir $install_dir;
2382
2383 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2384 if (-d $sdk_root) {
2385   foreach my $sdk_lang (("python",
2386                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2387     if (-d "$sdk_root/$sdk_lang") {
2388       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2389         die "Failed to install $sdk_lang SDK: $!";
2390       }
2391     }
2392   }
2393 }
2394
2395 my $python_dir = "$install_dir/python";
2396 if ((-d $python_dir) and can_run("python2.7")) {
2397   open(my $egg_info_pipe, "-|",
2398        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2399   my @egg_info_errors = <$egg_info_pipe>;
2400   close($egg_info_pipe);
2401
2402   if ($?) {
2403     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2404       # egg_info apparently failed because it couldn't ask git for a build tag.
2405       # Specify no build tag.
2406       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2407       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2408       close($pysdk_cfg);
2409     } else {
2410       my $egg_info_exit = $? >> 8;
2411       foreach my $errline (@egg_info_errors) {
2412         warn $errline;
2413       }
2414       warn "python setup.py egg_info failed: exit $egg_info_exit";
2415       exit ($egg_info_exit || 1);
2416     }
2417   }
2418 }
2419
2420 # Hide messages from the install script (unless it fails: shell_or_die
2421 # will show $destdir.log in that case).
2422 open(STDOUT, ">>", "$destdir.log");
2423 open(STDERR, ">&", STDOUT);
2424
2425 if (-e "$destdir/crunch_scripts/install") {
2426     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2427 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2428     # Old version
2429     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2430 } elsif (-e "./install.sh") {
2431     shell_or_die (undef, "./install.sh", $install_dir);
2432 }
2433
2434 if ($archive_hash) {
2435     unlink "$destdir.archive_hash.new";
2436     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2437     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2438 }
2439
2440 close L;
2441
2442 sub can_run {
2443   my $command_name = shift;
2444   open(my $which, "-|", "which", $command_name);
2445   while (<$which>) { }
2446   close($which);
2447   return ($? == 0);
2448 }
2449
2450 sub shell_or_die
2451 {
2452   my $exitcode = shift;
2453
2454   if ($ENV{"DEBUG"}) {
2455     print STDERR "@_\n";
2456   }
2457   if (system (@_) != 0) {
2458     my $err = $!;
2459     my $code = $?;
2460     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2461     open STDERR, ">&STDERR_ORIG";
2462     system ("cat $destdir.log >&2");
2463     warn "@_ failed ($err): $exitstatus";
2464     if (defined($exitcode)) {
2465       exit $exitcode;
2466     }
2467     else {
2468       exit (($code >> 8) || 1);
2469     }
2470   }
2471 }
2472
2473 __DATA__