Merge branch '8815-crunchrunner-everywhere' into 8654-arv-jobs-cwl-runner
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 $ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
113 unless (defined($ENV{"HOST_CERTS"})) {
114   if (-f "/etc/ssl/certs/ca-certificates.crt") {
115     $ENV{"HOST_CERTS"} = "/etc/ssl/certs/ca-certificates.crt";
116   } elsif (-f "/etc/pki/tls/certs/ca-bundle.crt") {
117     $ENV{"HOST_CERTS"} = "/etc/pki/tls/certs/ca-bundle.crt";
118   }
119 }
120
121 # Create the tmp directory if it does not exist
122 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
123   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
124 }
125
126 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
127 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
128 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
129 mkdir ($ENV{"JOB_WORK"});
130
131 my %proc;
132 my $force_unlock;
133 my $git_dir;
134 my $jobspec;
135 my $job_api_token;
136 my $no_clear_tmp;
137 my $resume_stash;
138 my $cgroup_root = "/sys/fs/cgroup";
139 my $docker_bin = "docker.io";
140 my $docker_run_args = "";
141 GetOptions('force-unlock' => \$force_unlock,
142            'git-dir=s' => \$git_dir,
143            'job=s' => \$jobspec,
144            'job-api-token=s' => \$job_api_token,
145            'no-clear-tmp' => \$no_clear_tmp,
146            'resume-stash=s' => \$resume_stash,
147            'cgroup-root=s' => \$cgroup_root,
148            'docker-bin=s' => \$docker_bin,
149            'docker-run-args=s' => \$docker_run_args,
150     );
151
152 if (defined $job_api_token) {
153   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
154 }
155
156 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
157
158
159 $SIG{'USR1'} = sub
160 {
161   $main::ENV{CRUNCH_DEBUG} = 1;
162 };
163 $SIG{'USR2'} = sub
164 {
165   $main::ENV{CRUNCH_DEBUG} = 0;
166 };
167
168 my $arv = Arvados->new('apiVersion' => 'v1');
169
170 my $Job;
171 my $job_id;
172 my $dbh;
173 my $sth;
174 my @jobstep;
175
176 my $local_job;
177 if ($jobspec =~ /^[-a-z\d]+$/)
178 {
179   # $jobspec is an Arvados UUID, not a JSON job specification
180   $Job = api_call("jobs/get", uuid => $jobspec);
181   $local_job = 0;
182 }
183 else
184 {
185   $local_job = JSON::decode_json($jobspec);
186 }
187
188
189 # Make sure our workers (our slurm nodes, localhost, or whatever) are
190 # at least able to run basic commands: they aren't down or severely
191 # misconfigured.
192 my $cmd = ['true'];
193 if (($Job || $local_job)->{docker_image_locator}) {
194   $cmd = [$docker_bin, 'ps', '-q'];
195 }
196 Log(undef, "Sanity check is `@$cmd`");
197 my ($exited, $stdout, $stderr) = srun_sync(
198   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
199   $cmd,
200   {label => "sanity check"});
201 if ($exited != 0) {
202   Log(undef, "Sanity check failed: ".exit_status_s($exited));
203   exit EX_TEMPFAIL;
204 }
205 Log(undef, "Sanity check OK");
206
207
208 my $User = api_call("users/current");
209
210 if (!$local_job) {
211   if (!$force_unlock) {
212     # Claim this job, and make sure nobody else does
213     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
214     if ($@) {
215       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
216       exit EX_TEMPFAIL;
217     };
218   }
219 }
220 else
221 {
222   if (!$resume_stash)
223   {
224     map { croak ("No $_ specified") unless $local_job->{$_} }
225     qw(script script_version script_parameters);
226   }
227
228   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
229   $local_job->{'started_at'} = gmtime;
230   $local_job->{'state'} = 'Running';
231
232   $Job = api_call("jobs/create", job => $local_job);
233 }
234 $job_id = $Job->{'uuid'};
235
236 my $keep_logfile = $job_id . '.log.txt';
237 log_writer_start($keep_logfile);
238
239 $Job->{'runtime_constraints'} ||= {};
240 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
241 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
242
243 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
244 if ($? == 0) {
245   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
246   chomp($gem_versions);
247   chop($gem_versions);  # Closing parentheses
248 } else {
249   $gem_versions = "";
250 }
251 Log(undef,
252     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
253
254 Log (undef, "check slurm allocation");
255 my @slot;
256 my @node;
257 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
258 my @sinfo;
259 if (!$have_slurm)
260 {
261   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
262   push @sinfo, "$localcpus localhost";
263 }
264 if (exists $ENV{SLURM_NODELIST})
265 {
266   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
267 }
268 foreach (@sinfo)
269 {
270   my ($ncpus, $slurm_nodelist) = split;
271   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
272
273   my @nodelist;
274   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
275   {
276     my $nodelist = $1;
277     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
278     {
279       my $ranges = $1;
280       foreach (split (",", $ranges))
281       {
282         my ($a, $b);
283         if (/(\d+)-(\d+)/)
284         {
285           $a = $1;
286           $b = $2;
287         }
288         else
289         {
290           $a = $_;
291           $b = $_;
292         }
293         push @nodelist, map {
294           my $n = $nodelist;
295           $n =~ s/\[[-,\d]+\]/$_/;
296           $n;
297         } ($a..$b);
298       }
299     }
300     else
301     {
302       push @nodelist, $nodelist;
303     }
304   }
305   foreach my $nodename (@nodelist)
306   {
307     Log (undef, "node $nodename - $ncpus slots");
308     my $node = { name => $nodename,
309                  ncpus => $ncpus,
310                  # The number of consecutive times a task has been dispatched
311                  # to this node and failed.
312                  losing_streak => 0,
313                  # The number of consecutive times that SLURM has reported
314                  # a node failure since the last successful task.
315                  fail_count => 0,
316                  # Don't dispatch work to this node until this time
317                  # (in seconds since the epoch) has passed.
318                  hold_until => 0 };
319     foreach my $cpu (1..$ncpus)
320     {
321       push @slot, { node => $node,
322                     cpu => $cpu };
323     }
324   }
325   push @node, @nodelist;
326 }
327
328
329
330 # Ensure that we get one jobstep running on each allocated node before
331 # we start overloading nodes with concurrent steps
332
333 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
334
335
336 $Job->update_attributes(
337   'tasks_summary' => { 'failed' => 0,
338                        'todo' => 1,
339                        'running' => 0,
340                        'done' => 0 });
341
342 Log (undef, "start");
343 $SIG{'INT'} = sub { $main::please_freeze = 1; };
344 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
345 $SIG{'TERM'} = \&croak;
346 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
347 $SIG{'ALRM'} = sub { $main::please_info = 1; };
348 $SIG{'CONT'} = sub { $main::please_continue = 1; };
349 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
350
351 $main::please_freeze = 0;
352 $main::please_info = 0;
353 $main::please_continue = 0;
354 $main::please_refresh = 0;
355 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
356
357 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
358 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
359 $ENV{"JOB_UUID"} = $job_id;
360
361
362 my @jobstep_todo = ();
363 my @jobstep_done = ();
364 my @jobstep_tomerge = ();
365 my $jobstep_tomerge_level = 0;
366 my $squeue_checked = 0;
367 my $latest_refresh = scalar time;
368
369
370
371 if (defined $Job->{thawedfromkey})
372 {
373   thaw ($Job->{thawedfromkey});
374 }
375 else
376 {
377   my $first_task = api_call("job_tasks/create", job_task => {
378     'job_uuid' => $Job->{'uuid'},
379     'sequence' => 0,
380     'qsequence' => 0,
381     'parameters' => {},
382   });
383   push @jobstep, { 'level' => 0,
384                    'failures' => 0,
385                    'arvados_task' => $first_task,
386                  };
387   push @jobstep_todo, 0;
388 }
389
390
391 if (!$have_slurm)
392 {
393   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
394 }
395
396 my $build_script = handle_readall(\*DATA);
397 my $nodelist = join(",", @node);
398 my $git_tar_count = 0;
399
400 if (!defined $no_clear_tmp) {
401   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
402   # up work directories crunch_tmp/work, crunch_tmp/opt,
403   # crunch_tmp/src*.
404   #
405   # TODO: When #5036 is done and widely deployed, we can limit mount's
406   # -t option to simply fuse.keep.
407   my ($exited, $stdout, $stderr) = srun_sync(
408     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
409     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
410     {label => "clean work dirs"});
411   if ($exited != 0) {
412     exit(EX_RETRY_UNLOCKED);
413   }
414 }
415
416 # If this job requires a Docker image, install that.
417 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
418 if ($docker_locator = $Job->{docker_image_locator}) {
419   Log (undef, "Install docker image $docker_locator");
420   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
421   if (!$docker_hash)
422   {
423     croak("No Docker image hash found from locator $docker_locator");
424   }
425   Log (undef, "docker image hash is $docker_hash");
426   $docker_stream =~ s/^\.//;
427   my $docker_install_script = qq{
428 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
429     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
430 fi
431 };
432
433   my ($exited, $stdout, $stderr) = srun_sync(
434     ["srun", "--nodelist=" . join(',', @node)],
435     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
436     {label => "load docker image"});
437   if ($exited != 0)
438   {
439     exit(EX_RETRY_UNLOCKED);
440   }
441
442   # Determine whether this version of Docker supports memory+swap limits.
443   ($exited, $stdout, $stderr) = srun_sync(
444     ["srun", "--nodes=1"],
445     [$docker_bin, 'run', '--help'],
446     {label => "check --memory-swap feature"});
447   $docker_limitmem = ($stdout =~ /--memory-swap/);
448
449   # Find a non-root Docker user to use.
450   # Tries the default user for the container, then 'crunch', then 'nobody',
451   # testing for whether the actual user id is non-zero.  This defends against
452   # mistakes but not malice, but we intend to harden the security in the future
453   # so we don't want anyone getting used to their jobs running as root in their
454   # Docker containers.
455   my @tryusers = ("", "crunch", "nobody");
456   foreach my $try_user (@tryusers) {
457     my $label;
458     my $try_user_arg;
459     if ($try_user eq "") {
460       $label = "check whether default user is UID 0";
461       $try_user_arg = "";
462     } else {
463       $label = "check whether user '$try_user' is UID 0";
464       $try_user_arg = "--user=$try_user";
465     }
466     my ($exited, $stdout, $stderr) = srun_sync(
467       ["srun", "--nodes=1"],
468       ["/bin/sh", "-ec",
469        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
470       {label => $label});
471     chomp($stdout);
472     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
473       $dockeruserarg = $try_user_arg;
474       if ($try_user eq "") {
475         Log(undef, "Container will run with default user");
476       } else {
477         Log(undef, "Container will run with $dockeruserarg");
478       }
479       last;
480     }
481   }
482
483   if (!defined $dockeruserarg) {
484     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
485   }
486
487   if ($Job->{arvados_sdk_version}) {
488     # The job also specifies an Arvados SDK version.  Add the SDKs to the
489     # tar file for the build script to install.
490     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
491                        $Job->{arvados_sdk_version}));
492     add_git_archive("git", "--git-dir=$git_dir", "archive",
493                     "--prefix=.arvados.sdk/",
494                     $Job->{arvados_sdk_version}, "sdk");
495   }
496 }
497
498 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
499   # If script_version looks like an absolute path, *and* the --git-dir
500   # argument was not given -- which implies we were not invoked by
501   # crunch-dispatch -- we will use the given path as a working
502   # directory instead of resolving script_version to a git commit (or
503   # doing anything else with git).
504   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
505   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
506 }
507 else {
508   # Resolve the given script_version to a git commit sha1. Also, if
509   # the repository is remote, clone it into our local filesystem: this
510   # ensures "git archive" will work, and is necessary to reliably
511   # resolve a symbolic script_version like "master^".
512   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
513
514   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
515
516   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
517
518   # If we're running under crunch-dispatch, it will have already
519   # pulled the appropriate source tree into its own repository, and
520   # given us that repo's path as $git_dir.
521   #
522   # If we're running a "local" job, we might have to fetch content
523   # from a remote repository.
524   #
525   # (Currently crunch-dispatch gives a local path with --git-dir, but
526   # we might as well accept URLs there too in case it changes its
527   # mind.)
528   my $repo = $git_dir || $Job->{'repository'};
529
530   # Repository can be remote or local. If remote, we'll need to fetch it
531   # to a local dir before doing `git log` et al.
532   my $repo_location;
533
534   if ($repo =~ m{://|^[^/]*:}) {
535     # $repo is a git url we can clone, like git:// or https:// or
536     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
537     # not recognized here because distinguishing that from a local
538     # path is too fragile. If you really need something strange here,
539     # use the ssh:// form.
540     $repo_location = 'remote';
541   } elsif ($repo =~ m{^\.*/}) {
542     # $repo is a local path to a git index. We'll also resolve ../foo
543     # to ../foo/.git if the latter is a directory. To help
544     # disambiguate local paths from named hosted repositories, this
545     # form must be given as ./ or ../ if it's a relative path.
546     if (-d "$repo/.git") {
547       $repo = "$repo/.git";
548     }
549     $repo_location = 'local';
550   } else {
551     # $repo is none of the above. It must be the name of a hosted
552     # repository.
553     my $arv_repo_list = api_call("repositories/list",
554                                  'filters' => [['name','=',$repo]]);
555     my @repos_found = @{$arv_repo_list->{'items'}};
556     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
557     if ($n_found > 0) {
558       Log(undef, "Repository '$repo' -> "
559           . join(", ", map { $_->{'uuid'} } @repos_found));
560     }
561     if ($n_found != 1) {
562       croak("Error: Found $n_found repositories with name '$repo'.");
563     }
564     $repo = $repos_found[0]->{'fetch_url'};
565     $repo_location = 'remote';
566   }
567   Log(undef, "Using $repo_location repository '$repo'");
568   $ENV{"CRUNCH_SRC_URL"} = $repo;
569
570   # Resolve given script_version (we'll call that $treeish here) to a
571   # commit sha1 ($commit).
572   my $treeish = $Job->{'script_version'};
573   my $commit;
574   if ($repo_location eq 'remote') {
575     # We minimize excess object-fetching by re-using the same bare
576     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
577     # just keep adding remotes to it as needed.
578     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
579     my $gitcmd = "git --git-dir=\Q$local_repo\E";
580
581     # Set up our local repo for caching remote objects, making
582     # archives, etc.
583     if (!-d $local_repo) {
584       make_path($local_repo) or croak("Error: could not create $local_repo");
585     }
586     # This works (exits 0 and doesn't delete fetched objects) even
587     # if $local_repo is already initialized:
588     `$gitcmd init --bare`;
589     if ($?) {
590       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
591     }
592
593     # If $treeish looks like a hash (or abbrev hash) we look it up in
594     # our local cache first, since that's cheaper. (We don't want to
595     # do that with tags/branches though -- those change over time, so
596     # they should always be resolved by the remote repo.)
597     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
598       # Hide stderr because it's normal for this to fail:
599       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
600       if ($? == 0 &&
601           # Careful not to resolve a branch named abcdeff to commit 1234567:
602           $sha1 =~ /^$treeish/ &&
603           $sha1 =~ /^([0-9a-f]{40})$/s) {
604         $commit = $1;
605         Log(undef, "Commit $commit already present in $local_repo");
606       }
607     }
608
609     if (!defined $commit) {
610       # If $treeish isn't just a hash or abbrev hash, or isn't here
611       # yet, we need to fetch the remote to resolve it correctly.
612
613       # First, remove all local heads. This prevents a name that does
614       # not exist on the remote from resolving to (or colliding with)
615       # a previously fetched branch or tag (possibly from a different
616       # remote).
617       remove_tree("$local_repo/refs/heads", {keep_root => 1});
618
619       Log(undef, "Fetching objects from $repo to $local_repo");
620       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
621       if ($?) {
622         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
623       }
624     }
625
626     # Now that the data is all here, we will use our local repo for
627     # the rest of our git activities.
628     $repo = $local_repo;
629   }
630
631   my $gitcmd = "git --git-dir=\Q$repo\E";
632   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
633   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
634     croak("`$gitcmd rev-list` exited "
635           .exit_status_s($?)
636           .", '$treeish' not found, giving up");
637   }
638   $commit = $1;
639   Log(undef, "Version $treeish is commit $commit");
640
641   if ($commit ne $Job->{'script_version'}) {
642     # Record the real commit id in the database, frozentokey, logs,
643     # etc. -- instead of an abbreviation or a branch name which can
644     # become ambiguous or point to a different commit in the future.
645     if (!$Job->update_attributes('script_version' => $commit)) {
646       croak("Error: failed to update job's script_version attribute");
647     }
648   }
649
650   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
651   add_git_archive("$gitcmd archive ''\Q$commit\E");
652 }
653
654 my $git_archive = combined_git_archive();
655 if (!defined $git_archive) {
656   Log(undef, "Skip install phase (no git archive)");
657   if ($have_slurm) {
658     Log(undef, "Warning: This probably means workers have no source tree!");
659   }
660 }
661 else {
662   my $exited;
663   my $install_script_tries_left = 3;
664   for (my $attempts = 0; $attempts < 3; $attempts++) {
665     my @srunargs = ("srun",
666                     "--nodelist=$nodelist",
667                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
668     my @execargs = ("sh", "-c",
669                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
670
671     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
672     my ($stdout, $stderr);
673     ($exited, $stdout, $stderr) = srun_sync(
674       \@srunargs, \@execargs,
675       {label => "run install script on all workers"},
676       $build_script . $git_archive);
677
678     my $stderr_anything_from_script = 0;
679     for my $line (split(/\n/, $stderr)) {
680       if ($line !~ /^(srun: error: |starting: \[)/) {
681         $stderr_anything_from_script = 1;
682       }
683     }
684
685     last if $exited == 0 || $main::please_freeze;
686
687     # If the install script fails but doesn't print an error message,
688     # the next thing anyone is likely to do is just run it again in
689     # case it was a transient problem like "slurm communication fails
690     # because the network isn't reliable enough". So we'll just do
691     # that ourselves (up to 3 attempts in total). OTOH, if there is an
692     # error message, the problem is more likely to have a real fix and
693     # we should fail the job so the fixing process can start, instead
694     # of doing 2 more attempts.
695     last if $stderr_anything_from_script;
696   }
697
698   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
699     unlink($tar_filename);
700   }
701
702   if ($exited != 0) {
703     croak("Giving up");
704   }
705 }
706
707 foreach (qw (script script_version script_parameters runtime_constraints))
708 {
709   Log (undef,
710        "$_ " .
711        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
712 }
713 foreach (split (/\n/, $Job->{knobs}))
714 {
715   Log (undef, "knob " . $_);
716 }
717
718
719
720 $main::success = undef;
721
722
723
724 ONELEVEL:
725
726 my $thisround_succeeded = 0;
727 my $thisround_failed = 0;
728 my $thisround_failed_multiple = 0;
729 my $working_slot_count = scalar(@slot);
730
731 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
732                        or $a <=> $b } @jobstep_todo;
733 my $level = $jobstep[$jobstep_todo[0]]->{level};
734
735 my $initial_tasks_this_level = 0;
736 foreach my $id (@jobstep_todo) {
737   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
738 }
739
740 # If the number of tasks scheduled at this level #T is smaller than the number
741 # of slots available #S, only use the first #T slots, or the first slot on
742 # each node, whichever number is greater.
743 #
744 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
745 # based on these numbers.  Using fewer slots makes more resources available
746 # to each individual task, which should normally be a better strategy when
747 # there are fewer of them running with less parallelism.
748 #
749 # Note that this calculation is not redone if the initial tasks at
750 # this level queue more tasks at the same level.  This may harm
751 # overall task throughput for that level.
752 my @freeslot;
753 if ($initial_tasks_this_level < @node) {
754   @freeslot = (0..$#node);
755 } elsif ($initial_tasks_this_level < @slot) {
756   @freeslot = (0..$initial_tasks_this_level - 1);
757 } else {
758   @freeslot = (0..$#slot);
759 }
760 my $round_num_freeslots = scalar(@freeslot);
761 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
762
763 my %round_max_slots = ();
764 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
765   my $this_slot = $slot[$freeslot[$ii]];
766   my $node_name = $this_slot->{node}->{name};
767   $round_max_slots{$node_name} ||= $this_slot->{cpu};
768   last if (scalar(keys(%round_max_slots)) >= @node);
769 }
770
771 Log(undef, "start level $level with $round_num_freeslots slots");
772 my @holdslot;
773 my %reader;
774 my $progress_is_dirty = 1;
775 my $progress_stats_updated = 0;
776
777 update_progress_stats();
778
779
780 THISROUND:
781 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
782 {
783   # Don't create new tasks if we already know the job's final result.
784   last if defined($main::success);
785
786   my $id = $jobstep_todo[$todo_ptr];
787   my $Jobstep = $jobstep[$id];
788   if ($Jobstep->{level} != $level)
789   {
790     next;
791   }
792
793   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
794   set_nonblocking($reader{$id});
795
796   my $childslot = $freeslot[0];
797   my $childnode = $slot[$childslot]->{node};
798   my $childslotname = join (".",
799                             $slot[$childslot]->{node}->{name},
800                             $slot[$childslot]->{cpu});
801
802   my $childpid = fork();
803   if ($childpid == 0)
804   {
805     $SIG{'INT'} = 'DEFAULT';
806     $SIG{'QUIT'} = 'DEFAULT';
807     $SIG{'TERM'} = 'DEFAULT';
808
809     foreach (values (%reader))
810     {
811       close($_);
812     }
813     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
814     open(STDOUT,">&writer");
815     open(STDERR,">&writer");
816
817     undef $dbh;
818     undef $sth;
819
820     delete $ENV{"GNUPGHOME"};
821     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
822     $ENV{"TASK_QSEQUENCE"} = $id;
823     $ENV{"TASK_SEQUENCE"} = $level;
824     $ENV{"JOB_SCRIPT"} = $Job->{script};
825     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
826       $param =~ tr/a-z/A-Z/;
827       $ENV{"JOB_PARAMETER_$param"} = $value;
828     }
829     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
830     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
831     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
832     $ENV{"HOME"} = $ENV{"TASK_WORK"};
833     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
834     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
835     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
836
837     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
838
839     $ENV{"GZIP"} = "-n";
840
841     my @srunargs = (
842       "srun",
843       "--nodelist=".$childnode->{name},
844       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
845       "--job-name=$job_id.$id.$$",
846         );
847
848     my $stdbuf = " stdbuf --output=0 --error=0 ";
849
850     my $arv_file_cache = "";
851     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
852       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
853     }
854
855     my $command =
856         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
857         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
858         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
859         # These environment variables get used explicitly later in
860         # $command.  No tool is expected to read these values directly.
861         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
862         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
863         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
864         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
865
866     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
867     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
868     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
869
870     if ($docker_hash)
871     {
872       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
873       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
874       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
875       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
876       # We only set memory limits if Docker lets us limit both memory and swap.
877       # Memory limits alone have been supported longer, but subprocesses tend
878       # to get SIGKILL if they exceed that without any swap limit set.
879       # See #5642 for additional background.
880       if ($docker_limitmem) {
881         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
882       }
883
884       # The source tree and $destdir directory (which we have
885       # installed on the worker host) are available in the container,
886       # under the same path.
887       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
888       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
889
890       # Currently, we make the "by_pdh" directory in arv-mount's mount
891       # point appear at /keep inside the container (instead of using
892       # the same path as the host like we do with CRUNCH_SRC and
893       # CRUNCH_INSTALL). However, crunch scripts and utilities must
894       # not rely on this. They must use $TASK_KEEPMOUNT.
895       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
896       $ENV{TASK_KEEPMOUNT} = "/keep";
897
898       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
899       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
900       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
901
902       # TASK_WORK is almost exactly like a docker data volume: it
903       # starts out empty, is writable, and persists until no
904       # containers use it any more. We don't use --volumes-from to
905       # share it with other containers: it is only accessible to this
906       # task, and it goes away when this task stops.
907       #
908       # However, a docker data volume is writable only by root unless
909       # the mount point already happens to exist in the container with
910       # different permissions. Therefore, we [1] assume /tmp already
911       # exists in the image and is writable by the crunch user; [2]
912       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
913       # writable if they are created by docker while setting up the
914       # other --volumes); and [3] create $TASK_WORK inside the
915       # container using $build_script.
916       $command .= "--volume=/tmp ";
917       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
918       $ENV{"HOME"} = $ENV{"TASK_WORK"};
919       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
920
921       # TODO: Share a single JOB_WORK volume across all task
922       # containers on a given worker node, and delete it when the job
923       # ends (and, in case that doesn't work, when the next job
924       # starts).
925       #
926       # For now, use the same approach as TASK_WORK above.
927       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
928
929       # Bind mount the crunchrunner binary and host TLS certificates file into
930       # the container.
931       $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/local/bin/crunchrunner\E ";
932       $command .= "--volume=\Q$ENV{HOST_CERTS}:/etc/arvados/ca-certificates.crt\E ";
933
934       while (my ($env_key, $env_val) = each %ENV)
935       {
936         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
937           $command .= "--env=\Q$env_key=$env_val\E ";
938         }
939       }
940       $command .= "--env=\QHOME=$ENV{HOME}\E ";
941       $command .= "\Q$docker_hash\E ";
942
943       if ($Job->{arvados_sdk_version}) {
944         $command .= $stdbuf;
945         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
946       } else {
947         $command .= "/bin/sh -c \'python -c " .
948             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
949             ">&2 2>/dev/null; " .
950             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
951             "if which stdbuf >/dev/null ; then " .
952             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
953             " else " .
954             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
955             " fi\'";
956       }
957     } else {
958       # Non-docker run
959       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
960       $command .= $stdbuf;
961       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
962     }
963
964     my @execargs = ('bash', '-c', $command);
965     srun (\@srunargs, \@execargs, undef, $build_script);
966     # exec() failed, we assume nothing happened.
967     die "srun() failed on build script\n";
968   }
969   close("writer");
970   if (!defined $childpid)
971   {
972     close $reader{$id};
973     delete $reader{$id};
974     next;
975   }
976   shift @freeslot;
977   $proc{$childpid} = {
978     jobstepidx => $id,
979     time => time,
980     slot => $childslot,
981     jobstepname => "$job_id.$id.$childpid",
982   };
983   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
984   $slot[$childslot]->{pid} = $childpid;
985
986   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
987   Log ($id, "child $childpid started on $childslotname");
988   $Jobstep->{starttime} = time;
989   $Jobstep->{node} = $childnode->{name};
990   $Jobstep->{slotindex} = $childslot;
991   delete $Jobstep->{stderr};
992   delete $Jobstep->{finishtime};
993   delete $Jobstep->{tempfail};
994
995   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
996   $Jobstep->{'arvados_task'}->save;
997
998   splice @jobstep_todo, $todo_ptr, 1;
999   --$todo_ptr;
1000
1001   $progress_is_dirty = 1;
1002
1003   while (!@freeslot
1004          ||
1005          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1006   {
1007     last THISROUND if $main::please_freeze;
1008     if ($main::please_info)
1009     {
1010       $main::please_info = 0;
1011       freeze();
1012       create_output_collection();
1013       save_meta(1);
1014       update_progress_stats();
1015     }
1016     my $gotsome
1017         = readfrompipes ()
1018         + reapchildren ();
1019     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1020     {
1021       check_refresh_wanted();
1022       check_squeue();
1023       update_progress_stats();
1024     }
1025     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1026     {
1027       update_progress_stats();
1028     }
1029     if (!$gotsome) {
1030       select (undef, undef, undef, 0.1);
1031     }
1032     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1033                                         $_->{node}->{hold_count} < 4 } @slot);
1034     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1035         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1036     {
1037       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1038           .($thisround_failed+$thisround_succeeded)
1039           .") -- giving up on this round";
1040       Log (undef, $message);
1041       last THISROUND;
1042     }
1043
1044     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1045     for (my $i=$#freeslot; $i>=0; $i--) {
1046       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1047         push @holdslot, (splice @freeslot, $i, 1);
1048       }
1049     }
1050     for (my $i=$#holdslot; $i>=0; $i--) {
1051       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1052         push @freeslot, (splice @holdslot, $i, 1);
1053       }
1054     }
1055
1056     # give up if no nodes are succeeding
1057     if ($working_slot_count < 1) {
1058       Log(undef, "Every node has failed -- giving up");
1059       last THISROUND;
1060     }
1061   }
1062 }
1063
1064
1065 push @freeslot, splice @holdslot;
1066 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1067
1068
1069 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1070 while (%proc)
1071 {
1072   if ($main::please_continue) {
1073     $main::please_continue = 0;
1074     goto THISROUND;
1075   }
1076   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1077   readfrompipes ();
1078   if (!reapchildren())
1079   {
1080     check_refresh_wanted();
1081     check_squeue();
1082     update_progress_stats();
1083     select (undef, undef, undef, 0.1);
1084     killem (keys %proc) if $main::please_freeze;
1085   }
1086 }
1087
1088 update_progress_stats();
1089 freeze_if_want_freeze();
1090
1091
1092 if (!defined $main::success)
1093 {
1094   if (!@jobstep_todo) {
1095     $main::success = 1;
1096   } elsif ($working_slot_count < 1) {
1097     save_output_collection();
1098     save_meta();
1099     exit(EX_RETRY_UNLOCKED);
1100   } elsif ($thisround_succeeded == 0 &&
1101            ($thisround_failed == 0 || $thisround_failed > 4)) {
1102     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1103     Log (undef, $message);
1104     $main::success = 0;
1105   }
1106 }
1107
1108 goto ONELEVEL if !defined $main::success;
1109
1110
1111 release_allocation();
1112 freeze();
1113 my $collated_output = save_output_collection();
1114 Log (undef, "finish");
1115
1116 save_meta();
1117
1118 my $final_state;
1119 if ($collated_output && $main::success) {
1120   $final_state = 'Complete';
1121 } else {
1122   $final_state = 'Failed';
1123 }
1124 $Job->update_attributes('state' => $final_state);
1125
1126 exit (($final_state eq 'Complete') ? 0 : 1);
1127
1128
1129
1130 sub update_progress_stats
1131 {
1132   $progress_stats_updated = time;
1133   return if !$progress_is_dirty;
1134   my ($todo, $done, $running) = (scalar @jobstep_todo,
1135                                  scalar @jobstep_done,
1136                                  scalar keys(%proc));
1137   $Job->{'tasks_summary'} ||= {};
1138   $Job->{'tasks_summary'}->{'todo'} = $todo;
1139   $Job->{'tasks_summary'}->{'done'} = $done;
1140   $Job->{'tasks_summary'}->{'running'} = $running;
1141   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1142   Log (undef, "status: $done done, $running running, $todo todo");
1143   $progress_is_dirty = 0;
1144 }
1145
1146
1147
1148 sub reapchildren
1149 {
1150   my $children_reaped = 0;
1151   my @successful_task_uuids = ();
1152
1153   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1154   {
1155     my $childstatus = $?;
1156
1157     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1158                     . "."
1159                     . $slot[$proc{$pid}->{slot}]->{cpu});
1160     my $jobstepidx = $proc{$pid}->{jobstepidx};
1161
1162     if (!WIFEXITED($childstatus))
1163     {
1164       # child did not exit (may be temporarily stopped)
1165       Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
1166       next;
1167     }
1168
1169     $children_reaped++;
1170     my $elapsed = time - $proc{$pid}->{time};
1171     my $Jobstep = $jobstep[$jobstepidx];
1172
1173     my $exitvalue = $childstatus >> 8;
1174     my $exitinfo = "exit ".exit_status_s($childstatus);
1175     $Jobstep->{'arvados_task'}->reload;
1176     my $task_success = $Jobstep->{'arvados_task'}->{success};
1177
1178     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1179
1180     if (!defined $task_success) {
1181       # task did not indicate one way or the other --> fail
1182       Log($jobstepidx, sprintf(
1183             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1184             exit_status_s($childstatus)));
1185       $Jobstep->{'arvados_task'}->{success} = 0;
1186       $Jobstep->{'arvados_task'}->save;
1187       $task_success = 0;
1188     }
1189
1190     if (!$task_success)
1191     {
1192       my $temporary_fail;
1193       $temporary_fail ||= $Jobstep->{tempfail};
1194       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1195
1196       ++$thisround_failed;
1197       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1198
1199       # Check for signs of a failed or misconfigured node
1200       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1201           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1202         # Don't count this against jobstep failure thresholds if this
1203         # node is already suspected faulty and srun exited quickly
1204         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1205             $elapsed < 5) {
1206           Log ($jobstepidx, "blaming failure on suspect node " .
1207                $slot[$proc{$pid}->{slot}]->{node}->{name});
1208           $temporary_fail ||= 1;
1209         }
1210         ban_node_by_slot($proc{$pid}->{slot});
1211       }
1212
1213       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1214                                 ++$Jobstep->{'failures'},
1215                                 $temporary_fail ? 'temporary' : 'permanent',
1216                                 $elapsed));
1217
1218       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1219         # Give up on this task, and the whole job
1220         $main::success = 0;
1221       }
1222       # Put this task back on the todo queue
1223       push @jobstep_todo, $jobstepidx;
1224       $Job->{'tasks_summary'}->{'failed'}++;
1225     }
1226     else # task_success
1227     {
1228       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1229       ++$thisround_succeeded;
1230       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1231       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1232       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1233       push @jobstep_done, $jobstepidx;
1234       Log ($jobstepidx, "success in $elapsed seconds");
1235     }
1236     $Jobstep->{exitcode} = $childstatus;
1237     $Jobstep->{finishtime} = time;
1238     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1239     $Jobstep->{'arvados_task'}->save;
1240     process_stderr_final ($jobstepidx);
1241     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1242                               length($Jobstep->{'arvados_task'}->{output}),
1243                               $Jobstep->{'arvados_task'}->{output}));
1244
1245     close $reader{$jobstepidx};
1246     delete $reader{$jobstepidx};
1247     delete $slot[$proc{$pid}->{slot}]->{pid};
1248     push @freeslot, $proc{$pid}->{slot};
1249     delete $proc{$pid};
1250
1251     $progress_is_dirty = 1;
1252   }
1253
1254   if (scalar(@successful_task_uuids) > 0)
1255   {
1256     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1257     # Load new tasks
1258     my $newtask_list = [];
1259     my $newtask_results;
1260     do {
1261       $newtask_results = api_call(
1262         "job_tasks/list",
1263         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1264         'order' => 'qsequence',
1265         'offset' => scalar(@$newtask_list),
1266           );
1267       push(@$newtask_list, @{$newtask_results->{items}});
1268     } while (@{$newtask_results->{items}});
1269     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1270     foreach my $arvados_task (@$newtask_list) {
1271       my $jobstep = {
1272         'level' => $arvados_task->{'sequence'},
1273         'failures' => 0,
1274         'arvados_task' => $arvados_task
1275       };
1276       push @jobstep, $jobstep;
1277       push @jobstep_todo, $#jobstep;
1278     }
1279   }
1280
1281   return $children_reaped;
1282 }
1283
1284 sub check_refresh_wanted
1285 {
1286   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1287   if (@stat &&
1288       $stat[9] > $latest_refresh &&
1289       # ...and we have actually locked the job record...
1290       $job_id eq $Job->{'uuid'}) {
1291     $latest_refresh = scalar time;
1292     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1293     for my $attr ('cancelled_at',
1294                   'cancelled_by_user_uuid',
1295                   'cancelled_by_client_uuid',
1296                   'state') {
1297       $Job->{$attr} = $Job2->{$attr};
1298     }
1299     if ($Job->{'state'} ne "Running") {
1300       if ($Job->{'state'} eq "Cancelled") {
1301         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1302       } else {
1303         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1304       }
1305       $main::success = 0;
1306       $main::please_freeze = 1;
1307     }
1308   }
1309 }
1310
1311 sub check_squeue
1312 {
1313   my $last_squeue_check = $squeue_checked;
1314
1315   # Do not call `squeue` or check the kill list more than once every
1316   # 15 seconds.
1317   return if $last_squeue_check > time - 15;
1318   $squeue_checked = time;
1319
1320   # Look for children from which we haven't received stderr data since
1321   # the last squeue check. If no such children exist, all procs are
1322   # alive and there's no need to even look at squeue.
1323   #
1324   # As long as the crunchstat poll interval (10s) is shorter than the
1325   # squeue check interval (15s) this should make the squeue check an
1326   # infrequent event.
1327   my $silent_procs = 0;
1328   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1329   {
1330     if (!exists($js->{stderr_at}))
1331     {
1332       $js->{stderr_at} = 0;
1333     }
1334     if ($js->{stderr_at} < $last_squeue_check)
1335     {
1336       $silent_procs++;
1337     }
1338   }
1339   return if $silent_procs == 0;
1340
1341   # use killem() on procs whose killtime is reached
1342   while (my ($pid, $procinfo) = each %proc)
1343   {
1344     my $js = $jobstep[$procinfo->{jobstepidx}];
1345     if (exists $procinfo->{killtime}
1346         && $procinfo->{killtime} <= time
1347         && $js->{stderr_at} < $last_squeue_check)
1348     {
1349       my $sincewhen = "";
1350       if ($js->{stderr_at}) {
1351         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1352       }
1353       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1354       killem ($pid);
1355     }
1356   }
1357
1358   if (!$have_slurm)
1359   {
1360     # here is an opportunity to check for mysterious problems with local procs
1361     return;
1362   }
1363
1364   # Get a list of steps still running.  Note: squeue(1) says --steps
1365   # selects a format (which we override anyway) and allows us to
1366   # specify which steps we're interested in (which we don't).
1367   # Importantly, it also changes the meaning of %j from "job name" to
1368   # "step name" and (although this isn't mentioned explicitly in the
1369   # docs) switches from "one line per job" mode to "one line per step"
1370   # mode. Without it, we'd just get a list of one job, instead of a
1371   # list of N steps.
1372   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1373   if ($? != 0)
1374   {
1375     Log(undef, "warning: squeue exit status $? ($!)");
1376     return;
1377   }
1378   chop @squeue;
1379
1380   # which of my jobsteps are running, according to squeue?
1381   my %ok;
1382   for my $jobstepname (@squeue)
1383   {
1384     $ok{$jobstepname} = 1;
1385   }
1386
1387   # Check for child procs >60s old and not mentioned by squeue.
1388   while (my ($pid, $procinfo) = each %proc)
1389   {
1390     if ($procinfo->{time} < time - 60
1391         && $procinfo->{jobstepname}
1392         && !exists $ok{$procinfo->{jobstepname}}
1393         && !exists $procinfo->{killtime})
1394     {
1395       # According to slurm, this task has ended (successfully or not)
1396       # -- but our srun child hasn't exited. First we must wait (30
1397       # seconds) in case this is just a race between communication
1398       # channels. Then, if our srun child process still hasn't
1399       # terminated, we'll conclude some slurm communication
1400       # error/delay has caused the task to die without notifying srun,
1401       # and we'll kill srun ourselves.
1402       $procinfo->{killtime} = time + 30;
1403       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1404     }
1405   }
1406 }
1407
1408
1409 sub release_allocation
1410 {
1411   if ($have_slurm)
1412   {
1413     Log (undef, "release job allocation");
1414     system "scancel $ENV{SLURM_JOB_ID}";
1415   }
1416 }
1417
1418
1419 sub readfrompipes
1420 {
1421   my $gotsome = 0;
1422   my %fd_job;
1423   my $sel = IO::Select->new();
1424   foreach my $jobstepidx (keys %reader)
1425   {
1426     my $fd = $reader{$jobstepidx};
1427     $sel->add($fd);
1428     $fd_job{$fd} = $jobstepidx;
1429
1430     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1431       $sel->add($stdout_fd);
1432       $fd_job{$stdout_fd} = $jobstepidx;
1433     }
1434   }
1435   # select on all reader fds with 0.1s timeout
1436   my @ready_fds = $sel->can_read(0.1);
1437   foreach my $fd (@ready_fds)
1438   {
1439     my $buf;
1440     if (0 < sysread ($fd, $buf, 65536))
1441     {
1442       $gotsome = 1;
1443       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1444
1445       my $jobstepidx = $fd_job{$fd};
1446       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1447         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1448         next;
1449       }
1450
1451       $jobstep[$jobstepidx]->{stderr_at} = time;
1452       $jobstep[$jobstepidx]->{stderr} .= $buf;
1453
1454       # Consume everything up to the last \n
1455       preprocess_stderr ($jobstepidx);
1456
1457       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1458       {
1459         # If we get a lot of stderr without a newline, chop off the
1460         # front to avoid letting our buffer grow indefinitely.
1461         substr ($jobstep[$jobstepidx]->{stderr},
1462                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1463       }
1464     }
1465   }
1466   return $gotsome;
1467 }
1468
1469
1470 # Consume all full lines of stderr for a jobstep. Everything after the
1471 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1472 # returning.
1473 sub preprocess_stderr
1474 {
1475   my $jobstepidx = shift;
1476
1477   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1478     my $line = $1;
1479     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1480     Log ($jobstepidx, "stderr $line");
1481     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1482       # whoa.
1483       $main::please_freeze = 1;
1484     }
1485     elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1486       # Skip the following tempfail checks if this srun proc isn't
1487       # attached to a particular worker slot.
1488     }
1489     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1490       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1491       $slot[$job_slot_index]->{node}->{fail_count}++;
1492       $jobstep[$jobstepidx]->{tempfail} = 1;
1493       ban_node_by_slot($job_slot_index);
1494     }
1495     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1496       $jobstep[$jobstepidx]->{tempfail} = 1;
1497       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1498     }
1499     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1500       $jobstep[$jobstepidx]->{tempfail} = 1;
1501     }
1502   }
1503 }
1504
1505
1506 sub process_stderr_final
1507 {
1508   my $jobstepidx = shift;
1509   preprocess_stderr ($jobstepidx);
1510
1511   map {
1512     Log ($jobstepidx, "stderr $_");
1513   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1514   $jobstep[$jobstepidx]->{stderr} = '';
1515 }
1516
1517 sub fetch_block
1518 {
1519   my $hash = shift;
1520   my $keep;
1521   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1522     Log(undef, "fetch_block run error from arv-get $hash: $!");
1523     return undef;
1524   }
1525   my $output_block = "";
1526   while (1) {
1527     my $buf;
1528     my $bytes = sysread($keep, $buf, 1024 * 1024);
1529     if (!defined $bytes) {
1530       Log(undef, "fetch_block read error from arv-get: $!");
1531       $output_block = undef;
1532       last;
1533     } elsif ($bytes == 0) {
1534       # sysread returns 0 at the end of the pipe.
1535       last;
1536     } else {
1537       # some bytes were read into buf.
1538       $output_block .= $buf;
1539     }
1540   }
1541   close $keep;
1542   if ($?) {
1543     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1544     $output_block = undef;
1545   }
1546   return $output_block;
1547 }
1548
1549 # Create a collection by concatenating the output of all tasks (each
1550 # task's output is either a manifest fragment, a locator for a
1551 # manifest fragment stored in Keep, or nothing at all). Return the
1552 # portable_data_hash of the new collection.
1553 sub create_output_collection
1554 {
1555   Log (undef, "collate");
1556
1557   my ($child_out, $child_in);
1558   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1559 import arvados
1560 import sys
1561 print (arvados.api("v1").collections().
1562        create(body={"manifest_text": sys.stdin.read()}).
1563        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1564 }, retry_count());
1565
1566   my $task_idx = -1;
1567   my $manifest_size = 0;
1568   for (@jobstep)
1569   {
1570     ++$task_idx;
1571     my $output = $_->{'arvados_task'}->{output};
1572     next if (!defined($output));
1573     my $next_write;
1574     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1575       $next_write = fetch_block($output);
1576     } else {
1577       $next_write = $output;
1578     }
1579     if (defined($next_write)) {
1580       if (!defined(syswrite($child_in, $next_write))) {
1581         # There's been an error writing.  Stop the loop.
1582         # We'll log details about the exit code later.
1583         last;
1584       } else {
1585         $manifest_size += length($next_write);
1586       }
1587     } else {
1588       my $uuid = $_->{'arvados_task'}->{'uuid'};
1589       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1590       $main::success = 0;
1591     }
1592   }
1593   close($child_in);
1594   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1595
1596   my $joboutput;
1597   my $s = IO::Select->new($child_out);
1598   if ($s->can_read(120)) {
1599     sysread($child_out, $joboutput, 1024 * 1024);
1600     waitpid($pid, 0);
1601     if ($?) {
1602       Log(undef, "output collection creation exited " . exit_status_s($?));
1603       $joboutput = undef;
1604     } else {
1605       chomp($joboutput);
1606     }
1607   } else {
1608     Log (undef, "timed out while creating output collection");
1609     foreach my $signal (2, 2, 2, 15, 15, 9) {
1610       kill($signal, $pid);
1611       last if waitpid($pid, WNOHANG) == -1;
1612       sleep(1);
1613     }
1614   }
1615   close($child_out);
1616
1617   return $joboutput;
1618 }
1619
1620 # Calls create_output_collection, logs the result, and returns it.
1621 # If that was successful, save that as the output in the job record.
1622 sub save_output_collection {
1623   my $collated_output = create_output_collection();
1624
1625   if (!$collated_output) {
1626     Log(undef, "Failed to write output collection");
1627   }
1628   else {
1629     Log(undef, "job output $collated_output");
1630     $Job->update_attributes('output' => $collated_output);
1631   }
1632   return $collated_output;
1633 }
1634
1635 sub killem
1636 {
1637   foreach (@_)
1638   {
1639     my $sig = 2;                # SIGINT first
1640     if (exists $proc{$_}->{"sent_$sig"} &&
1641         time - $proc{$_}->{"sent_$sig"} > 4)
1642     {
1643       $sig = 15;                # SIGTERM if SIGINT doesn't work
1644     }
1645     if (exists $proc{$_}->{"sent_$sig"} &&
1646         time - $proc{$_}->{"sent_$sig"} > 4)
1647     {
1648       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1649     }
1650     if (!exists $proc{$_}->{"sent_$sig"})
1651     {
1652       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1653       kill $sig, $_;
1654       select (undef, undef, undef, 0.1);
1655       if ($sig == 2)
1656       {
1657         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1658       }
1659       $proc{$_}->{"sent_$sig"} = time;
1660       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1661     }
1662   }
1663 }
1664
1665
1666 sub fhbits
1667 {
1668   my($bits);
1669   for (@_) {
1670     vec($bits,fileno($_),1) = 1;
1671   }
1672   $bits;
1673 }
1674
1675
1676 # Send log output to Keep via arv-put.
1677 #
1678 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1679 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1680 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1681 # $log_pipe_pid is the pid of the arv-put subprocess.
1682 #
1683 # The only functions that should access these variables directly are:
1684 #
1685 # log_writer_start($logfilename)
1686 #     Starts an arv-put pipe, reading data on stdin and writing it to
1687 #     a $logfilename file in an output collection.
1688 #
1689 # log_writer_read_output([$timeout])
1690 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1691 #     Passes $timeout to the select() call, with a default of 0.01.
1692 #     Returns the result of the last read() call on $log_pipe_out, or
1693 #     -1 if read() wasn't called because select() timed out.
1694 #     Only other log_writer_* functions should need to call this.
1695 #
1696 # log_writer_send($txt)
1697 #     Writes $txt to the output log collection.
1698 #
1699 # log_writer_finish()
1700 #     Closes the arv-put pipe and returns the output that it produces.
1701 #
1702 # log_writer_is_active()
1703 #     Returns a true value if there is currently a live arv-put
1704 #     process, false otherwise.
1705 #
1706 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1707     $log_pipe_pid);
1708
1709 sub log_writer_start($)
1710 {
1711   my $logfilename = shift;
1712   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1713                         'arv-put',
1714                         '--stream',
1715                         '--retries', '3',
1716                         '--filename', $logfilename,
1717                         '-');
1718   $log_pipe_out_buf = "";
1719   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1720 }
1721
1722 sub log_writer_read_output {
1723   my $timeout = shift || 0.01;
1724   my $read = -1;
1725   while ($read && $log_pipe_out_select->can_read($timeout)) {
1726     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1727                  length($log_pipe_out_buf));
1728   }
1729   if (!defined($read)) {
1730     Log(undef, "error reading log manifest from arv-put: $!");
1731   }
1732   return $read;
1733 }
1734
1735 sub log_writer_send($)
1736 {
1737   my $txt = shift;
1738   print $log_pipe_in $txt;
1739   log_writer_read_output();
1740 }
1741
1742 sub log_writer_finish()
1743 {
1744   return unless $log_pipe_pid;
1745
1746   close($log_pipe_in);
1747
1748   my $logger_failed = 0;
1749   my $read_result = log_writer_read_output(120);
1750   if ($read_result == -1) {
1751     $logger_failed = -1;
1752     Log (undef, "timed out reading from 'arv-put'");
1753   } elsif ($read_result != 0) {
1754     $logger_failed = -2;
1755     Log(undef, "failed to read arv-put log manifest to EOF");
1756   }
1757
1758   waitpid($log_pipe_pid, 0);
1759   if ($?) {
1760     $logger_failed ||= $?;
1761     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1762   }
1763
1764   close($log_pipe_out);
1765   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1766   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1767       $log_pipe_out_select = undef;
1768
1769   return $arv_put_output;
1770 }
1771
1772 sub log_writer_is_active() {
1773   return $log_pipe_pid;
1774 }
1775
1776 sub Log                         # ($jobstepidx, $logmessage)
1777 {
1778   my ($jobstepidx, $logmessage) = @_;
1779   if ($logmessage =~ /\n/) {
1780     for my $line (split (/\n/, $_[1])) {
1781       Log ($jobstepidx, $line);
1782     }
1783     return;
1784   }
1785   my $fh = select STDERR; $|=1; select $fh;
1786   my $task_qseq = '';
1787   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1788     $task_qseq = $jobstepidx;
1789   }
1790   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1791   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1792   $message .= "\n";
1793   my $datetime;
1794   if (log_writer_is_active() || -t STDERR) {
1795     my @gmtime = gmtime;
1796     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1797                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1798   }
1799   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1800
1801   if (log_writer_is_active()) {
1802     log_writer_send($datetime . " " . $message);
1803   }
1804 }
1805
1806
1807 sub croak
1808 {
1809   my ($package, $file, $line) = caller;
1810   my $message = "@_ at $file line $line\n";
1811   Log (undef, $message);
1812   freeze() if @jobstep_todo;
1813   create_output_collection() if @jobstep_todo;
1814   cleanup();
1815   save_meta();
1816   die;
1817 }
1818
1819
1820 sub cleanup
1821 {
1822   return unless $Job;
1823   if ($Job->{'state'} eq 'Cancelled') {
1824     $Job->update_attributes('finished_at' => scalar gmtime);
1825   } else {
1826     $Job->update_attributes('state' => 'Failed');
1827   }
1828 }
1829
1830
1831 sub save_meta
1832 {
1833   my $justcheckpoint = shift; # false if this will be the last meta saved
1834   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1835   return unless log_writer_is_active();
1836   my $log_manifest = log_writer_finish();
1837   return unless defined($log_manifest);
1838
1839   if ($Job->{log}) {
1840     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1841     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1842   }
1843
1844   my $log_coll = api_call(
1845     "collections/create", ensure_unique_name => 1, collection => {
1846       manifest_text => $log_manifest,
1847       owner_uuid => $Job->{owner_uuid},
1848       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1849     });
1850   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1851   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1852 }
1853
1854
1855 sub freeze_if_want_freeze
1856 {
1857   if ($main::please_freeze)
1858   {
1859     release_allocation();
1860     if (@_)
1861     {
1862       # kill some srun procs before freeze+stop
1863       map { $proc{$_} = {} } @_;
1864       while (%proc)
1865       {
1866         killem (keys %proc);
1867         select (undef, undef, undef, 0.1);
1868         my $died;
1869         while (($died = waitpid (-1, WNOHANG)) > 0)
1870         {
1871           delete $proc{$died};
1872         }
1873       }
1874     }
1875     freeze();
1876     create_output_collection();
1877     cleanup();
1878     save_meta();
1879     exit 1;
1880   }
1881 }
1882
1883
1884 sub freeze
1885 {
1886   Log (undef, "Freeze not implemented");
1887   return;
1888 }
1889
1890
1891 sub thaw
1892 {
1893   croak ("Thaw not implemented");
1894 }
1895
1896
1897 sub freezequote
1898 {
1899   my $s = shift;
1900   $s =~ s/\\/\\\\/g;
1901   $s =~ s/\n/\\n/g;
1902   return $s;
1903 }
1904
1905
1906 sub freezeunquote
1907 {
1908   my $s = shift;
1909   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1910   return $s;
1911 }
1912
1913
1914 sub srun_sync
1915 {
1916   my $srunargs = shift;
1917   my $execargs = shift;
1918   my $opts = shift || {};
1919   my $stdin = shift;
1920
1921   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1922   Log (undef, "$label: start");
1923
1924   my ($stderr_r, $stderr_w);
1925   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1926
1927   my ($stdout_r, $stdout_w);
1928   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1929
1930   my $srunpid = fork();
1931   if ($srunpid == 0)
1932   {
1933     close($stderr_r);
1934     close($stdout_r);
1935     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1936     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1937     open(STDERR, ">&", $stderr_w);
1938     open(STDOUT, ">&", $stdout_w);
1939     srun ($srunargs, $execargs, $opts, $stdin);
1940     exit (1);
1941   }
1942   close($stderr_w);
1943   close($stdout_w);
1944
1945   set_nonblocking($stderr_r);
1946   set_nonblocking($stdout_r);
1947
1948   # Add entries to @jobstep and %proc so check_squeue() and
1949   # freeze_if_want_freeze() can treat it like a job task process.
1950   push @jobstep, {
1951     stderr => '',
1952     stderr_at => 0,
1953     stderr_captured => '',
1954     stdout_r => $stdout_r,
1955     stdout_captured => '',
1956   };
1957   my $jobstepidx = $#jobstep;
1958   $proc{$srunpid} = {
1959     jobstepidx => $jobstepidx,
1960   };
1961   $reader{$jobstepidx} = $stderr_r;
1962
1963   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1964     my $busy = readfrompipes();
1965     if (!$busy || ($latest_refresh + 2 < scalar time)) {
1966       check_refresh_wanted();
1967       check_squeue();
1968     }
1969     if (!$busy) {
1970       select(undef, undef, undef, 0.1);
1971     }
1972     killem(keys %proc) if $main::please_freeze;
1973   }
1974   my $exited = $?;
1975
1976   1 while readfrompipes();
1977   process_stderr_final ($jobstepidx);
1978
1979   Log (undef, "$label: exit ".exit_status_s($exited));
1980
1981   close($stdout_r);
1982   close($stderr_r);
1983   delete $proc{$srunpid};
1984   delete $reader{$jobstepidx};
1985
1986   my $j = pop @jobstep;
1987   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1988 }
1989
1990
1991 sub srun
1992 {
1993   my $srunargs = shift;
1994   my $execargs = shift;
1995   my $opts = shift || {};
1996   my $stdin = shift;
1997   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1998
1999   $Data::Dumper::Terse = 1;
2000   $Data::Dumper::Indent = 0;
2001   my $show_cmd = Dumper($args);
2002   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2003   $show_cmd =~ s/\n/ /g;
2004   if ($opts->{fork}) {
2005     Log(undef, "starting: $show_cmd");
2006   } else {
2007     # This is a child process: parent is in charge of reading our
2008     # stderr and copying it to Log() if needed.
2009     warn "starting: $show_cmd\n";
2010   }
2011
2012   if (defined $stdin) {
2013     my $child = open STDIN, "-|";
2014     defined $child or die "no fork: $!";
2015     if ($child == 0) {
2016       print $stdin or die $!;
2017       close STDOUT or die $!;
2018       exit 0;
2019     }
2020   }
2021
2022   return system (@$args) if $opts->{fork};
2023
2024   exec @$args;
2025   warn "ENV size is ".length(join(" ",%ENV));
2026   die "exec failed: $!: @$args";
2027 }
2028
2029
2030 sub ban_node_by_slot {
2031   # Don't start any new jobsteps on this node for 60 seconds
2032   my $slotid = shift;
2033   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2034   $slot[$slotid]->{node}->{hold_count}++;
2035   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2036 }
2037
2038 sub must_lock_now
2039 {
2040   my ($lockfile, $error_message) = @_;
2041   open L, ">", $lockfile or croak("$lockfile: $!");
2042   if (!flock L, LOCK_EX|LOCK_NB) {
2043     croak("Can't lock $lockfile: $error_message\n");
2044   }
2045 }
2046
2047 sub find_docker_image {
2048   # Given a Keep locator, check to see if it contains a Docker image.
2049   # If so, return its stream name and Docker hash.
2050   # If not, return undef for both values.
2051   my $locator = shift;
2052   my ($streamname, $filename);
2053   my $image = api_call("collections/get", uuid => $locator);
2054   if ($image) {
2055     foreach my $line (split(/\n/, $image->{manifest_text})) {
2056       my @tokens = split(/\s+/, $line);
2057       next if (!@tokens);
2058       $streamname = shift(@tokens);
2059       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2060         if (defined($filename)) {
2061           return (undef, undef);  # More than one file in the Collection.
2062         } else {
2063           $filename = (split(/:/, $filedata, 3))[2];
2064         }
2065       }
2066     }
2067   }
2068   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2069     return ($streamname, $1);
2070   } else {
2071     return (undef, undef);
2072   }
2073 }
2074
2075 sub retry_count {
2076   # Calculate the number of times an operation should be retried,
2077   # assuming exponential backoff, and that we're willing to retry as
2078   # long as tasks have been running.  Enforce a minimum of 3 retries.
2079   my ($starttime, $endtime, $timediff, $retries);
2080   if (@jobstep) {
2081     $starttime = $jobstep[0]->{starttime};
2082     $endtime = $jobstep[-1]->{finishtime};
2083   }
2084   if (!defined($starttime)) {
2085     $timediff = 0;
2086   } elsif (!defined($endtime)) {
2087     $timediff = time - $starttime;
2088   } else {
2089     $timediff = ($endtime - $starttime) - (time - $endtime);
2090   }
2091   if ($timediff > 0) {
2092     $retries = int(log($timediff) / log(2));
2093   } else {
2094     $retries = 1;  # Use the minimum.
2095   }
2096   return ($retries > 3) ? $retries : 3;
2097 }
2098
2099 sub retry_op {
2100   # Pass in two function references.
2101   # This method will be called with the remaining arguments.
2102   # If it dies, retry it with exponential backoff until it succeeds,
2103   # or until the current retry_count is exhausted.  After each failure
2104   # that can be retried, the second function will be called with
2105   # the current try count (0-based), next try time, and error message.
2106   my $operation = shift;
2107   my $retry_callback = shift;
2108   my $retries = retry_count();
2109   foreach my $try_count (0..$retries) {
2110     my $next_try = time + (2 ** $try_count);
2111     my $result = eval { $operation->(@_); };
2112     if (!$@) {
2113       return $result;
2114     } elsif ($try_count < $retries) {
2115       $retry_callback->($try_count, $next_try, $@);
2116       my $sleep_time = $next_try - time;
2117       sleep($sleep_time) if ($sleep_time > 0);
2118     }
2119   }
2120   # Ensure the error message ends in a newline, so Perl doesn't add
2121   # retry_op's line number to it.
2122   chomp($@);
2123   die($@ . "\n");
2124 }
2125
2126 sub api_call {
2127   # Pass in a /-separated API method name, and arguments for it.
2128   # This function will call that method, retrying as needed until
2129   # the current retry_count is exhausted, with a log on the first failure.
2130   my $method_name = shift;
2131   my $log_api_retry = sub {
2132     my ($try_count, $next_try_at, $errmsg) = @_;
2133     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2134     $errmsg =~ s/\s/ /g;
2135     $errmsg =~ s/\s+$//;
2136     my $retry_msg;
2137     if ($next_try_at < time) {
2138       $retry_msg = "Retrying.";
2139     } else {
2140       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2141       $retry_msg = "Retrying at $next_try_fmt.";
2142     }
2143     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2144   };
2145   my $method = $arv;
2146   foreach my $key (split(/\//, $method_name)) {
2147     $method = $method->{$key};
2148   }
2149   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2150 }
2151
2152 sub exit_status_s {
2153   # Given a $?, return a human-readable exit code string like "0" or
2154   # "1" or "0 with signal 1" or "1 with signal 11".
2155   my $exitcode = shift;
2156   my $s = $exitcode >> 8;
2157   if ($exitcode & 0x7f) {
2158     $s .= " with signal " . ($exitcode & 0x7f);
2159   }
2160   if ($exitcode & 0x80) {
2161     $s .= " with core dump";
2162   }
2163   return $s;
2164 }
2165
2166 sub handle_readall {
2167   # Pass in a glob reference to a file handle.
2168   # Read all its contents and return them as a string.
2169   my $fh_glob_ref = shift;
2170   local $/ = undef;
2171   return <$fh_glob_ref>;
2172 }
2173
2174 sub tar_filename_n {
2175   my $n = shift;
2176   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2177 }
2178
2179 sub add_git_archive {
2180   # Pass in a git archive command as a string or list, a la system().
2181   # This method will save its output to be included in the archive sent to the
2182   # build script.
2183   my $git_input;
2184   $git_tar_count++;
2185   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2186     croak("Failed to save git archive: $!");
2187   }
2188   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2189   close($git_input);
2190   waitpid($git_pid, 0);
2191   close(GIT_ARCHIVE);
2192   if ($?) {
2193     croak("Failed to save git archive: git exited " . exit_status_s($?));
2194   }
2195 }
2196
2197 sub combined_git_archive {
2198   # Combine all saved tar archives into a single archive, then return its
2199   # contents in a string.  Return undef if no archives have been saved.
2200   if ($git_tar_count < 1) {
2201     return undef;
2202   }
2203   my $base_tar_name = tar_filename_n(1);
2204   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2205     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2206     if ($tar_exit != 0) {
2207       croak("Error preparing build archive: tar -A exited " .
2208             exit_status_s($tar_exit));
2209     }
2210   }
2211   if (!open(GIT_TAR, "<", $base_tar_name)) {
2212     croak("Could not open build archive: $!");
2213   }
2214   my $tar_contents = handle_readall(\*GIT_TAR);
2215   close(GIT_TAR);
2216   return $tar_contents;
2217 }
2218
2219 sub set_nonblocking {
2220   my $fh = shift;
2221   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2222   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2223 }
2224
2225 __DATA__
2226 #!/usr/bin/env perl
2227 #
2228 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2229 # server invokes this script on individual compute nodes, or localhost if we're
2230 # running a job locally.  It gets called in two modes:
2231 #
2232 # * No arguments: Installation mode.  Read a tar archive from the DATA
2233 #   file handle; it includes the Crunch script's source code, and
2234 #   maybe SDKs as well.  Those should be installed in the proper
2235 #   locations.  This runs outside of any Docker container, so don't try to
2236 #   introspect Crunch's runtime environment.
2237 #
2238 # * With arguments: Crunch script run mode.  This script should set up the
2239 #   environment, then run the command specified in the arguments.  This runs
2240 #   inside any Docker container.
2241
2242 use Fcntl ':flock';
2243 use File::Path qw( make_path remove_tree );
2244 use POSIX qw(getcwd);
2245
2246 use constant TASK_TEMPFAIL => 111;
2247
2248 # Map SDK subdirectories to the path environments they belong to.
2249 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2250
2251 my $destdir = $ENV{"CRUNCH_SRC"};
2252 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2253 my $repo = $ENV{"CRUNCH_SRC_URL"};
2254 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2255 my $job_work = $ENV{"JOB_WORK"};
2256 my $task_work = $ENV{"TASK_WORK"};
2257
2258 open(STDOUT_ORIG, ">&", STDOUT);
2259 open(STDERR_ORIG, ">&", STDERR);
2260
2261 for my $dir ($destdir, $job_work, $task_work) {
2262   if ($dir) {
2263     make_path $dir;
2264     -e $dir or die "Failed to create temporary directory ($dir): $!";
2265   }
2266 }
2267
2268 if ($task_work) {
2269   remove_tree($task_work, {keep_root => 1});
2270 }
2271
2272 ### Crunch script run mode
2273 if (@ARGV) {
2274   # We want to do routine logging during task 0 only.  This gives the user
2275   # the information they need, but avoids repeating the information for every
2276   # task.
2277   my $Log;
2278   if ($ENV{TASK_SEQUENCE} eq "0") {
2279     $Log = sub {
2280       my $msg = shift;
2281       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2282     };
2283   } else {
2284     $Log = sub { };
2285   }
2286
2287   my $python_src = "$install_dir/python";
2288   my $venv_dir = "$job_work/.arvados.venv";
2289   my $venv_built = -e "$venv_dir/bin/activate";
2290   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2291     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2292                  "--python=python2.7", $venv_dir);
2293     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2294     $venv_built = 1;
2295     $Log->("Built Python SDK virtualenv");
2296   }
2297
2298   my @pysdk_version_cmd = ("python", "-c",
2299     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2300   if ($venv_built) {
2301     $Log->("Running in Python SDK virtualenv");
2302     @pysdk_version_cmd = ();
2303     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2304     @ARGV = ("/bin/sh", "-ec",
2305              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2306   } elsif (-d $python_src) {
2307     $Log->("Warning: virtualenv not found inside Docker container default " .
2308            "\$PATH. Can't install Python SDK.");
2309   }
2310
2311   if (@pysdk_version_cmd) {
2312     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2313     my $pysdk_version = <$pysdk_version_pipe>;
2314     close($pysdk_version_pipe);
2315     if ($? == 0) {
2316       chomp($pysdk_version);
2317       $Log->("Using Arvados SDK version $pysdk_version");
2318     } else {
2319       # A lot could've gone wrong here, but pretty much all of it means that
2320       # Python won't be able to load the Arvados SDK.
2321       $Log->("Warning: Arvados SDK not found");
2322     }
2323   }
2324
2325   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2326     my $sdk_path = "$install_dir/$sdk_dir";
2327     if (-d $sdk_path) {
2328       if ($ENV{$sdk_envkey}) {
2329         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2330       } else {
2331         $ENV{$sdk_envkey} = $sdk_path;
2332       }
2333       $Log->("Arvados SDK added to %s", $sdk_envkey);
2334     }
2335   }
2336
2337   exec(@ARGV);
2338   die "Cannot exec `@ARGV`: $!";
2339 }
2340
2341 ### Installation mode
2342 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2343 flock L, LOCK_EX;
2344 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2345   # This exact git archive (source + arvados sdk) is already installed
2346   # here, so there's no need to reinstall it.
2347
2348   # We must consume our DATA section, though: otherwise the process
2349   # feeding it to us will get SIGPIPE.
2350   my $buf;
2351   while (read(DATA, $buf, 65536)) { }
2352
2353   exit(0);
2354 }
2355
2356 unlink "$destdir.archive_hash";
2357 mkdir $destdir;
2358
2359 do {
2360   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2361   local $SIG{PIPE} = "IGNORE";
2362   warn "Extracting archive: $archive_hash\n";
2363   # --ignore-zeros is necessary sometimes: depending on how much NUL
2364   # padding tar -A put on our combined archive (which in turn depends
2365   # on the length of the component archives) tar without
2366   # --ignore-zeros will exit before consuming stdin and cause close()
2367   # to fail on the resulting SIGPIPE.
2368   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2369     die "Error launching 'tar -xC $destdir': $!";
2370   }
2371   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2372   # get SIGPIPE.  We must feed it data incrementally.
2373   my $tar_input;
2374   while (read(DATA, $tar_input, 65536)) {
2375     print TARX $tar_input;
2376   }
2377   if(!close(TARX)) {
2378     die "'tar -xC $destdir' exited $?: $!";
2379   }
2380 };
2381
2382 mkdir $install_dir;
2383
2384 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2385 if (-d $sdk_root) {
2386   foreach my $sdk_lang (("python",
2387                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2388     if (-d "$sdk_root/$sdk_lang") {
2389       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2390         die "Failed to install $sdk_lang SDK: $!";
2391       }
2392     }
2393   }
2394 }
2395
2396 my $python_dir = "$install_dir/python";
2397 if ((-d $python_dir) and can_run("python2.7")) {
2398   open(my $egg_info_pipe, "-|",
2399        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2400   my @egg_info_errors = <$egg_info_pipe>;
2401   close($egg_info_pipe);
2402
2403   if ($?) {
2404     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2405       # egg_info apparently failed because it couldn't ask git for a build tag.
2406       # Specify no build tag.
2407       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2408       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2409       close($pysdk_cfg);
2410     } else {
2411       my $egg_info_exit = $? >> 8;
2412       foreach my $errline (@egg_info_errors) {
2413         warn $errline;
2414       }
2415       warn "python setup.py egg_info failed: exit $egg_info_exit";
2416       exit ($egg_info_exit || 1);
2417     }
2418   }
2419 }
2420
2421 # Hide messages from the install script (unless it fails: shell_or_die
2422 # will show $destdir.log in that case).
2423 open(STDOUT, ">>", "$destdir.log");
2424 open(STDERR, ">&", STDOUT);
2425
2426 if (-e "$destdir/crunch_scripts/install") {
2427     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2428 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2429     # Old version
2430     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2431 } elsif (-e "./install.sh") {
2432     shell_or_die (undef, "./install.sh", $install_dir);
2433 }
2434
2435 if ($archive_hash) {
2436     unlink "$destdir.archive_hash.new";
2437     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2438     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2439 }
2440
2441 close L;
2442
2443 sub can_run {
2444   my $command_name = shift;
2445   open(my $which, "-|", "which", $command_name);
2446   while (<$which>) { }
2447   close($which);
2448   return ($? == 0);
2449 }
2450
2451 sub shell_or_die
2452 {
2453   my $exitcode = shift;
2454
2455   if ($ENV{"DEBUG"}) {
2456     print STDERR "@_\n";
2457   }
2458   if (system (@_) != 0) {
2459     my $err = $!;
2460     my $code = $?;
2461     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2462     open STDERR, ">&STDERR_ORIG";
2463     system ("cat $destdir.log >&2");
2464     warn "@_ failed ($err): $exitstatus";
2465     if (defined($exitcode)) {
2466       exit $exitcode;
2467     }
2468     else {
2469       exit (($code >> 8) || 1);
2470     }
2471   }
2472 }
2473
2474 __DATA__