8099: When invoking setup tasks via srun, check slurm queue and propagate stderr...
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 my ($exited, $stdout, $stderr) = srun_sync(
187   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
188   $cmd,
189   {label => "sanity check"});
190 if ($exited != 0) {
191   Log(undef, "Sanity check failed: ".exit_status_s($exited));
192   exit EX_TEMPFAIL;
193 }
194 Log(undef, "Sanity check OK");
195
196
197 my $User = api_call("users/current");
198
199 if (!$local_job) {
200   if (!$force_unlock) {
201     # Claim this job, and make sure nobody else does
202     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
203     if ($@) {
204       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
205       exit EX_TEMPFAIL;
206     };
207   }
208 }
209 else
210 {
211   if (!$resume_stash)
212   {
213     map { croak ("No $_ specified") unless $local_job->{$_} }
214     qw(script script_version script_parameters);
215   }
216
217   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
218   $local_job->{'started_at'} = gmtime;
219   $local_job->{'state'} = 'Running';
220
221   $Job = api_call("jobs/create", job => $local_job);
222 }
223 $job_id = $Job->{'uuid'};
224
225 my $keep_logfile = $job_id . '.log.txt';
226 log_writer_start($keep_logfile);
227
228 $Job->{'runtime_constraints'} ||= {};
229 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
230 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
231
232 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
233 if ($? == 0) {
234   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
235   chomp($gem_versions);
236   chop($gem_versions);  # Closing parentheses
237 } else {
238   $gem_versions = "";
239 }
240 Log(undef,
241     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
242
243 Log (undef, "check slurm allocation");
244 my @slot;
245 my @node;
246 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
247 my @sinfo;
248 if (!$have_slurm)
249 {
250   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
251   push @sinfo, "$localcpus localhost";
252 }
253 if (exists $ENV{SLURM_NODELIST})
254 {
255   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
256 }
257 foreach (@sinfo)
258 {
259   my ($ncpus, $slurm_nodelist) = split;
260   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
261
262   my @nodelist;
263   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
264   {
265     my $nodelist = $1;
266     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
267     {
268       my $ranges = $1;
269       foreach (split (",", $ranges))
270       {
271         my ($a, $b);
272         if (/(\d+)-(\d+)/)
273         {
274           $a = $1;
275           $b = $2;
276         }
277         else
278         {
279           $a = $_;
280           $b = $_;
281         }
282         push @nodelist, map {
283           my $n = $nodelist;
284           $n =~ s/\[[-,\d]+\]/$_/;
285           $n;
286         } ($a..$b);
287       }
288     }
289     else
290     {
291       push @nodelist, $nodelist;
292     }
293   }
294   foreach my $nodename (@nodelist)
295   {
296     Log (undef, "node $nodename - $ncpus slots");
297     my $node = { name => $nodename,
298                  ncpus => $ncpus,
299                  # The number of consecutive times a task has been dispatched
300                  # to this node and failed.
301                  losing_streak => 0,
302                  # The number of consecutive times that SLURM has reported
303                  # a node failure since the last successful task.
304                  fail_count => 0,
305                  # Don't dispatch work to this node until this time
306                  # (in seconds since the epoch) has passed.
307                  hold_until => 0 };
308     foreach my $cpu (1..$ncpus)
309     {
310       push @slot, { node => $node,
311                     cpu => $cpu };
312     }
313   }
314   push @node, @nodelist;
315 }
316
317
318
319 # Ensure that we get one jobstep running on each allocated node before
320 # we start overloading nodes with concurrent steps
321
322 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
323
324
325 $Job->update_attributes(
326   'tasks_summary' => { 'failed' => 0,
327                        'todo' => 1,
328                        'running' => 0,
329                        'done' => 0 });
330
331 Log (undef, "start");
332 $SIG{'INT'} = sub { $main::please_freeze = 1; };
333 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
334 $SIG{'TERM'} = \&croak;
335 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
336 $SIG{'ALRM'} = sub { $main::please_info = 1; };
337 $SIG{'CONT'} = sub { $main::please_continue = 1; };
338 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
339
340 $main::please_freeze = 0;
341 $main::please_info = 0;
342 $main::please_continue = 0;
343 $main::please_refresh = 0;
344 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
345
346 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
347 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
348 $ENV{"JOB_UUID"} = $job_id;
349
350
351 my @jobstep_todo = ();
352 my @jobstep_done = ();
353 my @jobstep_tomerge = ();
354 my $jobstep_tomerge_level = 0;
355 my $squeue_checked = 0;
356 my $latest_refresh = scalar time;
357
358
359
360 if (defined $Job->{thawedfromkey})
361 {
362   thaw ($Job->{thawedfromkey});
363 }
364 else
365 {
366   my $first_task = api_call("job_tasks/create", job_task => {
367     'job_uuid' => $Job->{'uuid'},
368     'sequence' => 0,
369     'qsequence' => 0,
370     'parameters' => {},
371   });
372   push @jobstep, { 'level' => 0,
373                    'failures' => 0,
374                    'arvados_task' => $first_task,
375                  };
376   push @jobstep_todo, 0;
377 }
378
379
380 if (!$have_slurm)
381 {
382   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
383 }
384
385 my $build_script = handle_readall(\*DATA);
386 my $nodelist = join(",", @node);
387 my $git_tar_count = 0;
388
389 if (!defined $no_clear_tmp) {
390   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
391   # up work directories crunch_tmp/work, crunch_tmp/opt,
392   # crunch_tmp/src*.
393   #
394   # TODO: When #5036 is done and widely deployed, we can limit mount's
395   # -t option to simply fuse.keep.
396   my ($exited, $stdout, $stderr) = srun_sync(
397     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
399     {label => "clean work dirs"});
400   if ($exited != 0) {
401     exit(EX_RETRY_UNLOCKED);
402   }
403 }
404
405 # If this job requires a Docker image, install that.
406 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
407 if ($docker_locator = $Job->{docker_image_locator}) {
408   Log (undef, "Install docker image $docker_locator");
409   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
410   if (!$docker_hash)
411   {
412     croak("No Docker image hash found from locator $docker_locator");
413   }
414   Log (undef, "docker image hash is $docker_hash");
415   $docker_stream =~ s/^\.//;
416   my $docker_install_script = qq{
417 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
418     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
419 fi
420 };
421
422   my ($exited, $stdout, $stderr) = srun_sync(
423     ["srun", "--nodelist=" . join(',', @node)],
424     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
425     {label => "load docker image"});
426   if ($exited != 0)
427   {
428     exit(EX_RETRY_UNLOCKED);
429   }
430
431   # Determine whether this version of Docker supports memory+swap limits.
432   ($exited, $stdout, $stderr) = srun_sync(
433     ["srun", "--nodelist=" . $node[0]],
434     [$docker_bin, 'run', '--help'],
435     {label => "check --memory-swap feature"});
436   $docker_limitmem = ($stdout =~ /--memory-swap/);
437
438   # Find a non-root Docker user to use.
439   # Tries the default user for the container, then 'crunch', then 'nobody',
440   # testing for whether the actual user id is non-zero.  This defends against
441   # mistakes but not malice, but we intend to harden the security in the future
442   # so we don't want anyone getting used to their jobs running as root in their
443   # Docker containers.
444   my @tryusers = ("", "crunch", "nobody");
445   foreach my $try_user (@tryusers) {
446     my $label;
447     my $try_user_arg;
448     if ($try_user eq "") {
449       $label = "check whether default user is UID 0";
450       $try_user_arg = "";
451     } else {
452       $label = "check whether user '$try_user' is UID 0";
453       $try_user_arg = "--user=$try_user";
454     }
455     my ($exited, $stdout, $stderr) = srun_sync(
456       ["srun", "--nodelist=" . $node[0]],
457       ["/bin/sh", "-ec",
458        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
459       {label => $label});
460     chomp($stdout);
461     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
462       $dockeruserarg = $try_user_arg;
463       if ($try_user eq "") {
464         Log(undef, "Container will run with default user");
465       } else {
466         Log(undef, "Container will run with $dockeruserarg");
467       }
468       last;
469     }
470   }
471
472   if (!defined $dockeruserarg) {
473     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
474   }
475
476   if ($Job->{arvados_sdk_version}) {
477     # The job also specifies an Arvados SDK version.  Add the SDKs to the
478     # tar file for the build script to install.
479     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
480                        $Job->{arvados_sdk_version}));
481     add_git_archive("git", "--git-dir=$git_dir", "archive",
482                     "--prefix=.arvados.sdk/",
483                     $Job->{arvados_sdk_version}, "sdk");
484   }
485 }
486
487 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
488   # If script_version looks like an absolute path, *and* the --git-dir
489   # argument was not given -- which implies we were not invoked by
490   # crunch-dispatch -- we will use the given path as a working
491   # directory instead of resolving script_version to a git commit (or
492   # doing anything else with git).
493   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
494   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
495 }
496 else {
497   # Resolve the given script_version to a git commit sha1. Also, if
498   # the repository is remote, clone it into our local filesystem: this
499   # ensures "git archive" will work, and is necessary to reliably
500   # resolve a symbolic script_version like "master^".
501   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
502
503   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
504
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
506
507   # If we're running under crunch-dispatch, it will have already
508   # pulled the appropriate source tree into its own repository, and
509   # given us that repo's path as $git_dir.
510   #
511   # If we're running a "local" job, we might have to fetch content
512   # from a remote repository.
513   #
514   # (Currently crunch-dispatch gives a local path with --git-dir, but
515   # we might as well accept URLs there too in case it changes its
516   # mind.)
517   my $repo = $git_dir || $Job->{'repository'};
518
519   # Repository can be remote or local. If remote, we'll need to fetch it
520   # to a local dir before doing `git log` et al.
521   my $repo_location;
522
523   if ($repo =~ m{://|^[^/]*:}) {
524     # $repo is a git url we can clone, like git:// or https:// or
525     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
526     # not recognized here because distinguishing that from a local
527     # path is too fragile. If you really need something strange here,
528     # use the ssh:// form.
529     $repo_location = 'remote';
530   } elsif ($repo =~ m{^\.*/}) {
531     # $repo is a local path to a git index. We'll also resolve ../foo
532     # to ../foo/.git if the latter is a directory. To help
533     # disambiguate local paths from named hosted repositories, this
534     # form must be given as ./ or ../ if it's a relative path.
535     if (-d "$repo/.git") {
536       $repo = "$repo/.git";
537     }
538     $repo_location = 'local';
539   } else {
540     # $repo is none of the above. It must be the name of a hosted
541     # repository.
542     my $arv_repo_list = api_call("repositories/list",
543                                  'filters' => [['name','=',$repo]]);
544     my @repos_found = @{$arv_repo_list->{'items'}};
545     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
546     if ($n_found > 0) {
547       Log(undef, "Repository '$repo' -> "
548           . join(", ", map { $_->{'uuid'} } @repos_found));
549     }
550     if ($n_found != 1) {
551       croak("Error: Found $n_found repositories with name '$repo'.");
552     }
553     $repo = $repos_found[0]->{'fetch_url'};
554     $repo_location = 'remote';
555   }
556   Log(undef, "Using $repo_location repository '$repo'");
557   $ENV{"CRUNCH_SRC_URL"} = $repo;
558
559   # Resolve given script_version (we'll call that $treeish here) to a
560   # commit sha1 ($commit).
561   my $treeish = $Job->{'script_version'};
562   my $commit;
563   if ($repo_location eq 'remote') {
564     # We minimize excess object-fetching by re-using the same bare
565     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
566     # just keep adding remotes to it as needed.
567     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
568     my $gitcmd = "git --git-dir=\Q$local_repo\E";
569
570     # Set up our local repo for caching remote objects, making
571     # archives, etc.
572     if (!-d $local_repo) {
573       make_path($local_repo) or croak("Error: could not create $local_repo");
574     }
575     # This works (exits 0 and doesn't delete fetched objects) even
576     # if $local_repo is already initialized:
577     `$gitcmd init --bare`;
578     if ($?) {
579       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
580     }
581
582     # If $treeish looks like a hash (or abbrev hash) we look it up in
583     # our local cache first, since that's cheaper. (We don't want to
584     # do that with tags/branches though -- those change over time, so
585     # they should always be resolved by the remote repo.)
586     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
587       # Hide stderr because it's normal for this to fail:
588       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
589       if ($? == 0 &&
590           # Careful not to resolve a branch named abcdeff to commit 1234567:
591           $sha1 =~ /^$treeish/ &&
592           $sha1 =~ /^([0-9a-f]{40})$/s) {
593         $commit = $1;
594         Log(undef, "Commit $commit already present in $local_repo");
595       }
596     }
597
598     if (!defined $commit) {
599       # If $treeish isn't just a hash or abbrev hash, or isn't here
600       # yet, we need to fetch the remote to resolve it correctly.
601
602       # First, remove all local heads. This prevents a name that does
603       # not exist on the remote from resolving to (or colliding with)
604       # a previously fetched branch or tag (possibly from a different
605       # remote).
606       remove_tree("$local_repo/refs/heads", {keep_root => 1});
607
608       Log(undef, "Fetching objects from $repo to $local_repo");
609       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
610       if ($?) {
611         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
612       }
613     }
614
615     # Now that the data is all here, we will use our local repo for
616     # the rest of our git activities.
617     $repo = $local_repo;
618   }
619
620   my $gitcmd = "git --git-dir=\Q$repo\E";
621   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
622   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
623     croak("`$gitcmd rev-list` exited "
624           .exit_status_s($?)
625           .", '$treeish' not found, giving up");
626   }
627   $commit = $1;
628   Log(undef, "Version $treeish is commit $commit");
629
630   if ($commit ne $Job->{'script_version'}) {
631     # Record the real commit id in the database, frozentokey, logs,
632     # etc. -- instead of an abbreviation or a branch name which can
633     # become ambiguous or point to a different commit in the future.
634     if (!$Job->update_attributes('script_version' => $commit)) {
635       croak("Error: failed to update job's script_version attribute");
636     }
637   }
638
639   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
640   add_git_archive("$gitcmd archive ''\Q$commit\E");
641 }
642
643 my $git_archive = combined_git_archive();
644 if (!defined $git_archive) {
645   Log(undef, "Skip install phase (no git archive)");
646   if ($have_slurm) {
647     Log(undef, "Warning: This probably means workers have no source tree!");
648   }
649 }
650 else {
651   my $exited;
652   my $install_script_tries_left = 3;
653   for (my $attempts = 0; $attempts < 3; $attempts++) {
654     my @srunargs = ("srun",
655                     "--nodelist=$nodelist",
656                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
657     my @execargs = ("sh", "-c",
658                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
659
660     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
661     my ($stdout, $stderr);
662     ($exited, $stdout, $stderr) = srun_sync(
663       \@srunargs, \@execargs,
664       {label => "run install script on all workers"},
665       $build_script . $git_archive);
666
667     my $stderr_anything_from_script = 0;
668     for my $line (split(/\n/, $stderr)) {
669       if ($line !~ /^(srun: error: |starting: \[)/) {
670         $stderr_anything_from_script = 1;
671       }
672     }
673
674     last if $exited == 0 || $main::please_freeze;
675
676     # If the install script fails but doesn't print an error message,
677     # the next thing anyone is likely to do is just run it again in
678     # case it was a transient problem like "slurm communication fails
679     # because the network isn't reliable enough". So we'll just do
680     # that ourselves (up to 3 attempts in total). OTOH, if there is an
681     # error message, the problem is more likely to have a real fix and
682     # we should fail the job so the fixing process can start, instead
683     # of doing 2 more attempts.
684     last if $stderr_anything_from_script;
685   }
686
687   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
688     unlink($tar_filename);
689   }
690
691   if ($exited != 0) {
692     croak("Giving up");
693   }
694 }
695
696 foreach (qw (script script_version script_parameters runtime_constraints))
697 {
698   Log (undef,
699        "$_ " .
700        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
701 }
702 foreach (split (/\n/, $Job->{knobs}))
703 {
704   Log (undef, "knob " . $_);
705 }
706
707
708
709 $main::success = undef;
710
711
712
713 ONELEVEL:
714
715 my $thisround_succeeded = 0;
716 my $thisround_failed = 0;
717 my $thisround_failed_multiple = 0;
718 my $working_slot_count = scalar(@slot);
719
720 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
721                        or $a <=> $b } @jobstep_todo;
722 my $level = $jobstep[$jobstep_todo[0]]->{level};
723
724 my $initial_tasks_this_level = 0;
725 foreach my $id (@jobstep_todo) {
726   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
727 }
728
729 # If the number of tasks scheduled at this level #T is smaller than the number
730 # of slots available #S, only use the first #T slots, or the first slot on
731 # each node, whichever number is greater.
732 #
733 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
734 # based on these numbers.  Using fewer slots makes more resources available
735 # to each individual task, which should normally be a better strategy when
736 # there are fewer of them running with less parallelism.
737 #
738 # Note that this calculation is not redone if the initial tasks at
739 # this level queue more tasks at the same level.  This may harm
740 # overall task throughput for that level.
741 my @freeslot;
742 if ($initial_tasks_this_level < @node) {
743   @freeslot = (0..$#node);
744 } elsif ($initial_tasks_this_level < @slot) {
745   @freeslot = (0..$initial_tasks_this_level - 1);
746 } else {
747   @freeslot = (0..$#slot);
748 }
749 my $round_num_freeslots = scalar(@freeslot);
750
751 my %round_max_slots = ();
752 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
753   my $this_slot = $slot[$freeslot[$ii]];
754   my $node_name = $this_slot->{node}->{name};
755   $round_max_slots{$node_name} ||= $this_slot->{cpu};
756   last if (scalar(keys(%round_max_slots)) >= @node);
757 }
758
759 Log(undef, "start level $level with $round_num_freeslots slots");
760 my @holdslot;
761 my %reader;
762 my $progress_is_dirty = 1;
763 my $progress_stats_updated = 0;
764
765 update_progress_stats();
766
767
768 THISROUND:
769 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
770 {
771   # Don't create new tasks if we already know the job's final result.
772   last if defined($main::success);
773
774   my $id = $jobstep_todo[$todo_ptr];
775   my $Jobstep = $jobstep[$id];
776   if ($Jobstep->{level} != $level)
777   {
778     next;
779   }
780
781   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
782   set_nonblocking($reader{$id});
783
784   my $childslot = $freeslot[0];
785   my $childnode = $slot[$childslot]->{node};
786   my $childslotname = join (".",
787                             $slot[$childslot]->{node}->{name},
788                             $slot[$childslot]->{cpu});
789
790   my $childpid = fork();
791   if ($childpid == 0)
792   {
793     $SIG{'INT'} = 'DEFAULT';
794     $SIG{'QUIT'} = 'DEFAULT';
795     $SIG{'TERM'} = 'DEFAULT';
796
797     foreach (values (%reader))
798     {
799       close($_);
800     }
801     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
802     open(STDOUT,">&writer");
803     open(STDERR,">&writer");
804
805     undef $dbh;
806     undef $sth;
807
808     delete $ENV{"GNUPGHOME"};
809     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
810     $ENV{"TASK_QSEQUENCE"} = $id;
811     $ENV{"TASK_SEQUENCE"} = $level;
812     $ENV{"JOB_SCRIPT"} = $Job->{script};
813     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
814       $param =~ tr/a-z/A-Z/;
815       $ENV{"JOB_PARAMETER_$param"} = $value;
816     }
817     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
818     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
819     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
820     $ENV{"HOME"} = $ENV{"TASK_WORK"};
821     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
822     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
823     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
824
825     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
826
827     $ENV{"GZIP"} = "-n";
828
829     my @srunargs = (
830       "srun",
831       "--nodelist=".$childnode->{name},
832       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
833       "--job-name=$job_id.$id.$$",
834         );
835
836     my $stdbuf = " stdbuf --output=0 --error=0 ";
837
838     my $arv_file_cache = "";
839     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
840       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
841     }
842
843     my $command =
844         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
845         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
846         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
847         # These environment variables get used explicitly later in
848         # $command.  No tool is expected to read these values directly.
849         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
850         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
851         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
852         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
853
854     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
855     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
856     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
857
858     if ($docker_hash)
859     {
860       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
861       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
862       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
863       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
864       # We only set memory limits if Docker lets us limit both memory and swap.
865       # Memory limits alone have been supported longer, but subprocesses tend
866       # to get SIGKILL if they exceed that without any swap limit set.
867       # See #5642 for additional background.
868       if ($docker_limitmem) {
869         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
870       }
871
872       # The source tree and $destdir directory (which we have
873       # installed on the worker host) are available in the container,
874       # under the same path.
875       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
876       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
877
878       # Currently, we make the "by_pdh" directory in arv-mount's mount
879       # point appear at /keep inside the container (instead of using
880       # the same path as the host like we do with CRUNCH_SRC and
881       # CRUNCH_INSTALL). However, crunch scripts and utilities must
882       # not rely on this. They must use $TASK_KEEPMOUNT.
883       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
884       $ENV{TASK_KEEPMOUNT} = "/keep";
885
886       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
887       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
888       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
889
890       # TASK_WORK is almost exactly like a docker data volume: it
891       # starts out empty, is writable, and persists until no
892       # containers use it any more. We don't use --volumes-from to
893       # share it with other containers: it is only accessible to this
894       # task, and it goes away when this task stops.
895       #
896       # However, a docker data volume is writable only by root unless
897       # the mount point already happens to exist in the container with
898       # different permissions. Therefore, we [1] assume /tmp already
899       # exists in the image and is writable by the crunch user; [2]
900       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
901       # writable if they are created by docker while setting up the
902       # other --volumes); and [3] create $TASK_WORK inside the
903       # container using $build_script.
904       $command .= "--volume=/tmp ";
905       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
906       $ENV{"HOME"} = $ENV{"TASK_WORK"};
907       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
908
909       # TODO: Share a single JOB_WORK volume across all task
910       # containers on a given worker node, and delete it when the job
911       # ends (and, in case that doesn't work, when the next job
912       # starts).
913       #
914       # For now, use the same approach as TASK_WORK above.
915       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
916
917       while (my ($env_key, $env_val) = each %ENV)
918       {
919         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
920           $command .= "--env=\Q$env_key=$env_val\E ";
921         }
922       }
923       $command .= "--env=\QHOME=$ENV{HOME}\E ";
924       $command .= "\Q$docker_hash\E ";
925
926       if ($Job->{arvados_sdk_version}) {
927         $command .= $stdbuf;
928         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
929       } else {
930         $command .= "/bin/sh -c \'python -c " .
931             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
932             ">&2 2>/dev/null; " .
933             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
934             "if which stdbuf >/dev/null ; then " .
935             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
936             " else " .
937             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
938             " fi\'";
939       }
940     } else {
941       # Non-docker run
942       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
943       $command .= $stdbuf;
944       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
945     }
946
947     my @execargs = ('bash', '-c', $command);
948     srun (\@srunargs, \@execargs, undef, $build_script);
949     # exec() failed, we assume nothing happened.
950     die "srun() failed on build script\n";
951   }
952   close("writer");
953   if (!defined $childpid)
954   {
955     close $reader{$id};
956     delete $reader{$id};
957     next;
958   }
959   shift @freeslot;
960   $proc{$childpid} = {
961     jobstepidx => $id,
962     time => time,
963     slot => $childslot,
964     jobstepname => "$job_id.$id.$childpid",
965   };
966   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
967   $slot[$childslot]->{pid} = $childpid;
968
969   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
970   Log ($id, "child $childpid started on $childslotname");
971   $Jobstep->{starttime} = time;
972   $Jobstep->{node} = $childnode->{name};
973   $Jobstep->{slotindex} = $childslot;
974   delete $Jobstep->{stderr};
975   delete $Jobstep->{finishtime};
976   delete $Jobstep->{tempfail};
977
978   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
979   $Jobstep->{'arvados_task'}->save;
980
981   splice @jobstep_todo, $todo_ptr, 1;
982   --$todo_ptr;
983
984   $progress_is_dirty = 1;
985
986   while (!@freeslot
987          ||
988          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
989   {
990     last THISROUND if $main::please_freeze;
991     if ($main::please_info)
992     {
993       $main::please_info = 0;
994       freeze();
995       create_output_collection();
996       save_meta(1);
997       update_progress_stats();
998     }
999     my $gotsome
1000         = readfrompipes ()
1001         + reapchildren ();
1002     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1003     {
1004       check_refresh_wanted();
1005       check_squeue();
1006       update_progress_stats();
1007     }
1008     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1009     {
1010       update_progress_stats();
1011     }
1012     if (!$gotsome) {
1013       select (undef, undef, undef, 0.1);
1014     }
1015     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1016                                         $_->{node}->{hold_count} < 4 } @slot);
1017     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1018         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1019     {
1020       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1021           .($thisround_failed+$thisround_succeeded)
1022           .") -- giving up on this round";
1023       Log (undef, $message);
1024       last THISROUND;
1025     }
1026
1027     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1028     for (my $i=$#freeslot; $i>=0; $i--) {
1029       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1030         push @holdslot, (splice @freeslot, $i, 1);
1031       }
1032     }
1033     for (my $i=$#holdslot; $i>=0; $i--) {
1034       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1035         push @freeslot, (splice @holdslot, $i, 1);
1036       }
1037     }
1038
1039     # give up if no nodes are succeeding
1040     if ($working_slot_count < 1) {
1041       Log(undef, "Every node has failed -- giving up");
1042       last THISROUND;
1043     }
1044   }
1045 }
1046
1047
1048 push @freeslot, splice @holdslot;
1049 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1050
1051
1052 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1053 while (%proc)
1054 {
1055   if ($main::please_continue) {
1056     $main::please_continue = 0;
1057     goto THISROUND;
1058   }
1059   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1060   readfrompipes ();
1061   if (!reapchildren())
1062   {
1063     check_refresh_wanted();
1064     check_squeue();
1065     update_progress_stats();
1066     select (undef, undef, undef, 0.1);
1067     killem (keys %proc) if $main::please_freeze;
1068   }
1069 }
1070
1071 update_progress_stats();
1072 freeze_if_want_freeze();
1073
1074
1075 if (!defined $main::success)
1076 {
1077   if (!@jobstep_todo) {
1078     $main::success = 1;
1079   } elsif ($working_slot_count < 1) {
1080     save_output_collection();
1081     save_meta();
1082     exit(EX_RETRY_UNLOCKED);
1083   } elsif ($thisround_succeeded == 0 &&
1084            ($thisround_failed == 0 || $thisround_failed > 4)) {
1085     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1086     Log (undef, $message);
1087     $main::success = 0;
1088   }
1089 }
1090
1091 goto ONELEVEL if !defined $main::success;
1092
1093
1094 release_allocation();
1095 freeze();
1096 my $collated_output = save_output_collection();
1097 Log (undef, "finish");
1098
1099 save_meta();
1100
1101 my $final_state;
1102 if ($collated_output && $main::success) {
1103   $final_state = 'Complete';
1104 } else {
1105   $final_state = 'Failed';
1106 }
1107 $Job->update_attributes('state' => $final_state);
1108
1109 exit (($final_state eq 'Complete') ? 0 : 1);
1110
1111
1112
1113 sub update_progress_stats
1114 {
1115   $progress_stats_updated = time;
1116   return if !$progress_is_dirty;
1117   my ($todo, $done, $running) = (scalar @jobstep_todo,
1118                                  scalar @jobstep_done,
1119                                  scalar keys(%proc));
1120   $Job->{'tasks_summary'} ||= {};
1121   $Job->{'tasks_summary'}->{'todo'} = $todo;
1122   $Job->{'tasks_summary'}->{'done'} = $done;
1123   $Job->{'tasks_summary'}->{'running'} = $running;
1124   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1125   Log (undef, "status: $done done, $running running, $todo todo");
1126   $progress_is_dirty = 0;
1127 }
1128
1129
1130
1131 sub reapchildren
1132 {
1133   my $pid = waitpid (-1, WNOHANG);
1134   return 0 if $pid <= 0;
1135
1136   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1137                   . "."
1138                   . $slot[$proc{$pid}->{slot}]->{cpu});
1139   my $jobstepidx = $proc{$pid}->{jobstepidx};
1140   my $elapsed = time - $proc{$pid}->{time};
1141   my $Jobstep = $jobstep[$jobstepidx];
1142
1143   my $childstatus = $?;
1144   my $exitvalue = $childstatus >> 8;
1145   my $exitinfo = "exit ".exit_status_s($childstatus);
1146   $Jobstep->{'arvados_task'}->reload;
1147   my $task_success = $Jobstep->{'arvados_task'}->{success};
1148
1149   Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1150
1151   if (!defined $task_success) {
1152     # task did not indicate one way or the other --> fail
1153     Log($jobstepidx, sprintf(
1154           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1155           exit_status_s($childstatus)));
1156     $Jobstep->{'arvados_task'}->{success} = 0;
1157     $Jobstep->{'arvados_task'}->save;
1158     $task_success = 0;
1159   }
1160
1161   if (!$task_success)
1162   {
1163     my $temporary_fail;
1164     $temporary_fail ||= $Jobstep->{tempfail};
1165     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1166
1167     ++$thisround_failed;
1168     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1169
1170     # Check for signs of a failed or misconfigured node
1171     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1172         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1173       # Don't count this against jobstep failure thresholds if this
1174       # node is already suspected faulty and srun exited quickly
1175       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1176           $elapsed < 5) {
1177         Log ($jobstepidx, "blaming failure on suspect node " .
1178              $slot[$proc{$pid}->{slot}]->{node}->{name});
1179         $temporary_fail ||= 1;
1180       }
1181       ban_node_by_slot($proc{$pid}->{slot});
1182     }
1183
1184     Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1185                              ++$Jobstep->{'failures'},
1186                              $temporary_fail ? 'temporary' : 'permanent',
1187                              $elapsed));
1188
1189     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1190       # Give up on this task, and the whole job
1191       $main::success = 0;
1192     }
1193     # Put this task back on the todo queue
1194     push @jobstep_todo, $jobstepidx;
1195     $Job->{'tasks_summary'}->{'failed'}++;
1196   }
1197   else
1198   {
1199     ++$thisround_succeeded;
1200     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1201     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1202     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1203     push @jobstep_done, $jobstepidx;
1204     Log ($jobstepidx, "success in $elapsed seconds");
1205   }
1206   $Jobstep->{exitcode} = $childstatus;
1207   $Jobstep->{finishtime} = time;
1208   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1209   $Jobstep->{'arvados_task'}->save;
1210   process_stderr_final ($jobstepidx);
1211   Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1212                            length($Jobstep->{'arvados_task'}->{output}),
1213                            $Jobstep->{'arvados_task'}->{output}));
1214
1215   close $reader{$jobstepidx};
1216   delete $reader{$jobstepidx};
1217   delete $slot[$proc{$pid}->{slot}]->{pid};
1218   push @freeslot, $proc{$pid}->{slot};
1219   delete $proc{$pid};
1220
1221   if ($task_success) {
1222     # Load new tasks
1223     my $newtask_list = [];
1224     my $newtask_results;
1225     do {
1226       $newtask_results = api_call(
1227         "job_tasks/list",
1228         'where' => {
1229           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1230         },
1231         'order' => 'qsequence',
1232         'offset' => scalar(@$newtask_list),
1233       );
1234       push(@$newtask_list, @{$newtask_results->{items}});
1235     } while (@{$newtask_results->{items}});
1236     foreach my $arvados_task (@$newtask_list) {
1237       my $jobstep = {
1238         'level' => $arvados_task->{'sequence'},
1239         'failures' => 0,
1240         'arvados_task' => $arvados_task
1241       };
1242       push @jobstep, $jobstep;
1243       push @jobstep_todo, $#jobstep;
1244     }
1245   }
1246
1247   $progress_is_dirty = 1;
1248   1;
1249 }
1250
1251 sub check_refresh_wanted
1252 {
1253   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1254   if (@stat &&
1255       $stat[9] > $latest_refresh &&
1256       # ...and we have actually locked the job record...
1257       $job_id eq $Job->{'uuid'}) {
1258     $latest_refresh = scalar time;
1259     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1260     for my $attr ('cancelled_at',
1261                   'cancelled_by_user_uuid',
1262                   'cancelled_by_client_uuid',
1263                   'state') {
1264       $Job->{$attr} = $Job2->{$attr};
1265     }
1266     if ($Job->{'state'} ne "Running") {
1267       if ($Job->{'state'} eq "Cancelled") {
1268         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1269       } else {
1270         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1271       }
1272       $main::success = 0;
1273       $main::please_freeze = 1;
1274     }
1275   }
1276 }
1277
1278 sub check_squeue
1279 {
1280   my $last_squeue_check = $squeue_checked;
1281
1282   # Do not call `squeue` or check the kill list more than once every
1283   # 15 seconds.
1284   return if $last_squeue_check > time - 15;
1285   $squeue_checked = time;
1286
1287   # Look for children from which we haven't received stderr data since
1288   # the last squeue check. If no such children exist, all procs are
1289   # alive and there's no need to even look at squeue.
1290   #
1291   # As long as the crunchstat poll interval (10s) is shorter than the
1292   # squeue check interval (15s) this should make the squeue check an
1293   # infrequent event.
1294   my $silent_procs = 0;
1295   for my $procinfo (values %proc)
1296   {
1297     my $jobstep = $jobstep[$procinfo->{jobstepidx}];
1298     if ($jobstep->{stderr_at} < $last_squeue_check)
1299     {
1300       $silent_procs++;
1301     }
1302   }
1303   return if $silent_procs == 0;
1304
1305   # use killem() on procs whose killtime is reached
1306   while (my ($pid, $procinfo) = each %proc)
1307   {
1308     my $jobstep = $jobstep[$procinfo->{jobstepidx}];
1309     if (exists $procinfo->{killtime}
1310         && $procinfo->{killtime} <= time
1311         && $jobstep->{stderr_at} < $last_squeue_check)
1312     {
1313       my $sincewhen = "";
1314       if ($jobstep->{stderr_at}) {
1315         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1316       }
1317       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1318       killem ($pid);
1319     }
1320   }
1321
1322   if (!$have_slurm)
1323   {
1324     # here is an opportunity to check for mysterious problems with local procs
1325     return;
1326   }
1327
1328   # Get a list of steps still running.  Note: squeue(1) says --steps
1329   # selects a format (which we override anyway) and allows us to
1330   # specify which steps we're interested in (which we don't).
1331   # Importantly, it also changes the meaning of %j from "job name" to
1332   # "step name" and (although this isn't mentioned explicitly in the
1333   # docs) switches from "one line per job" mode to "one line per step"
1334   # mode. Without it, we'd just get a list of one job, instead of a
1335   # list of N steps.
1336   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1337   if ($? != 0)
1338   {
1339     Log(undef, "warning: squeue exit status $? ($!)");
1340     return;
1341   }
1342   chop @squeue;
1343
1344   # which of my jobsteps are running, according to squeue?
1345   my %ok;
1346   for my $jobstepname (@squeue)
1347   {
1348     $ok{$jobstepname} = 1;
1349   }
1350
1351   # Check for child procs >60s old and not mentioned by squeue.
1352   while (my ($pid, $procinfo) = each %proc)
1353   {
1354     if ($procinfo->{time} < time - 60
1355         && $procinfo->{jobstepname}
1356         && !exists $ok{$procinfo->{jobstepname}}
1357         && !exists $procinfo->{killtime})
1358     {
1359       # According to slurm, this task has ended (successfully or not)
1360       # -- but our srun child hasn't exited. First we must wait (30
1361       # seconds) in case this is just a race between communication
1362       # channels. Then, if our srun child process still hasn't
1363       # terminated, we'll conclude some slurm communication
1364       # error/delay has caused the task to die without notifying srun,
1365       # and we'll kill srun ourselves.
1366       $procinfo->{killtime} = time + 30;
1367       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1368     }
1369   }
1370 }
1371
1372
1373 sub release_allocation
1374 {
1375   if ($have_slurm)
1376   {
1377     Log (undef, "release job allocation");
1378     system "scancel $ENV{SLURM_JOB_ID}";
1379   }
1380 }
1381
1382
1383 sub readfrompipes
1384 {
1385   my $gotsome = 0;
1386   foreach my $jobstepidx (keys %reader)
1387   {
1388     my $buf;
1389     if ($jobstep[$jobstepidx]->{stdout_r} &&
1390         0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
1391     {
1392       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1393       if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
1394         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1395       }
1396       $gotsome = 1;
1397     }
1398     if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
1399     {
1400       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1401       $jobstep[$jobstepidx]->{stderr_at} = time;
1402       $jobstep[$jobstepidx]->{stderr} .= $buf;
1403       if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
1404         $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
1405       }
1406       $gotsome = 1;
1407
1408       # Consume everything up to the last \n
1409       preprocess_stderr ($jobstepidx);
1410
1411       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1412       {
1413         # If we get a lot of stderr without a newline, chop off the
1414         # front to avoid letting our buffer grow indefinitely.
1415         substr ($jobstep[$jobstepidx]->{stderr},
1416                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1417       }
1418     }
1419   }
1420   return $gotsome;
1421 }
1422
1423
1424 # Consume all full lines of stderr for a jobstep. Everything after the
1425 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1426 # returning.
1427 sub preprocess_stderr
1428 {
1429   my $jobstepidx = shift;
1430
1431   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1432     my $line = $1;
1433     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1434     Log ($jobstepidx, "stderr $line");
1435     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1436       # whoa.
1437       $main::please_freeze = 1;
1438     }
1439     elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1440       # Skip the following tempfail checks if this srun proc isn't
1441       # attached to a particular worker slot.
1442     }
1443     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1444       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1445       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1446       $slot[$job_slot_index]->{node}->{fail_count}++;
1447       $jobstep[$jobstepidx]->{tempfail} = 1;
1448       ban_node_by_slot($job_slot_index);
1449     }
1450     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1451       $jobstep[$jobstepidx]->{tempfail} = 1;
1452       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1453     }
1454     elsif ($line =~ /arvados\.errors\.Keep/) {
1455       $jobstep[$jobstepidx]->{tempfail} = 1;
1456     }
1457   }
1458 }
1459
1460
1461 sub process_stderr_final
1462 {
1463   my $jobstepidx = shift;
1464   preprocess_stderr ($jobstepidx);
1465
1466   map {
1467     Log ($jobstepidx, "stderr $_");
1468   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1469   $jobstep[$jobstepidx]->{stderr} = '';
1470 }
1471
1472 sub fetch_block
1473 {
1474   my $hash = shift;
1475   my $keep;
1476   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1477     Log(undef, "fetch_block run error from arv-get $hash: $!");
1478     return undef;
1479   }
1480   my $output_block = "";
1481   while (1) {
1482     my $buf;
1483     my $bytes = sysread($keep, $buf, 1024 * 1024);
1484     if (!defined $bytes) {
1485       Log(undef, "fetch_block read error from arv-get: $!");
1486       $output_block = undef;
1487       last;
1488     } elsif ($bytes == 0) {
1489       # sysread returns 0 at the end of the pipe.
1490       last;
1491     } else {
1492       # some bytes were read into buf.
1493       $output_block .= $buf;
1494     }
1495   }
1496   close $keep;
1497   if ($?) {
1498     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1499     $output_block = undef;
1500   }
1501   return $output_block;
1502 }
1503
1504 # Create a collection by concatenating the output of all tasks (each
1505 # task's output is either a manifest fragment, a locator for a
1506 # manifest fragment stored in Keep, or nothing at all). Return the
1507 # portable_data_hash of the new collection.
1508 sub create_output_collection
1509 {
1510   Log (undef, "collate");
1511
1512   my ($child_out, $child_in);
1513   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1514 import arvados
1515 import sys
1516 print (arvados.api("v1").collections().
1517        create(body={"manifest_text": sys.stdin.read()}).
1518        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1519 }, retry_count());
1520
1521   my $task_idx = -1;
1522   my $manifest_size = 0;
1523   for (@jobstep)
1524   {
1525     ++$task_idx;
1526     my $output = $_->{'arvados_task'}->{output};
1527     next if (!defined($output));
1528     my $next_write;
1529     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1530       $next_write = fetch_block($output);
1531     } else {
1532       $next_write = $output;
1533     }
1534     if (defined($next_write)) {
1535       if (!defined(syswrite($child_in, $next_write))) {
1536         # There's been an error writing.  Stop the loop.
1537         # We'll log details about the exit code later.
1538         last;
1539       } else {
1540         $manifest_size += length($next_write);
1541       }
1542     } else {
1543       my $uuid = $_->{'arvados_task'}->{'uuid'};
1544       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1545       $main::success = 0;
1546     }
1547   }
1548   close($child_in);
1549   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1550
1551   my $joboutput;
1552   my $s = IO::Select->new($child_out);
1553   if ($s->can_read(120)) {
1554     sysread($child_out, $joboutput, 1024 * 1024);
1555     waitpid($pid, 0);
1556     if ($?) {
1557       Log(undef, "output collection creation exited " . exit_status_s($?));
1558       $joboutput = undef;
1559     } else {
1560       chomp($joboutput);
1561     }
1562   } else {
1563     Log (undef, "timed out while creating output collection");
1564     foreach my $signal (2, 2, 2, 15, 15, 9) {
1565       kill($signal, $pid);
1566       last if waitpid($pid, WNOHANG) == -1;
1567       sleep(1);
1568     }
1569   }
1570   close($child_out);
1571
1572   return $joboutput;
1573 }
1574
1575 # Calls create_output_collection, logs the result, and returns it.
1576 # If that was successful, save that as the output in the job record.
1577 sub save_output_collection {
1578   my $collated_output = create_output_collection();
1579
1580   if (!$collated_output) {
1581     Log(undef, "Failed to write output collection");
1582   }
1583   else {
1584     Log(undef, "job output $collated_output");
1585     $Job->update_attributes('output' => $collated_output);
1586   }
1587   return $collated_output;
1588 }
1589
1590 sub killem
1591 {
1592   foreach (@_)
1593   {
1594     my $sig = 2;                # SIGINT first
1595     if (exists $proc{$_}->{"sent_$sig"} &&
1596         time - $proc{$_}->{"sent_$sig"} > 4)
1597     {
1598       $sig = 15;                # SIGTERM if SIGINT doesn't work
1599     }
1600     if (exists $proc{$_}->{"sent_$sig"} &&
1601         time - $proc{$_}->{"sent_$sig"} > 4)
1602     {
1603       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1604     }
1605     if (!exists $proc{$_}->{"sent_$sig"})
1606     {
1607       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1608       kill $sig, $_;
1609       select (undef, undef, undef, 0.1);
1610       if ($sig == 2)
1611       {
1612         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1613       }
1614       $proc{$_}->{"sent_$sig"} = time;
1615       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1616     }
1617   }
1618 }
1619
1620
1621 sub fhbits
1622 {
1623   my($bits);
1624   for (@_) {
1625     vec($bits,fileno($_),1) = 1;
1626   }
1627   $bits;
1628 }
1629
1630
1631 # Send log output to Keep via arv-put.
1632 #
1633 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1634 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1635 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1636 # $log_pipe_pid is the pid of the arv-put subprocess.
1637 #
1638 # The only functions that should access these variables directly are:
1639 #
1640 # log_writer_start($logfilename)
1641 #     Starts an arv-put pipe, reading data on stdin and writing it to
1642 #     a $logfilename file in an output collection.
1643 #
1644 # log_writer_read_output([$timeout])
1645 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1646 #     Passes $timeout to the select() call, with a default of 0.01.
1647 #     Returns the result of the last read() call on $log_pipe_out, or
1648 #     -1 if read() wasn't called because select() timed out.
1649 #     Only other log_writer_* functions should need to call this.
1650 #
1651 # log_writer_send($txt)
1652 #     Writes $txt to the output log collection.
1653 #
1654 # log_writer_finish()
1655 #     Closes the arv-put pipe and returns the output that it produces.
1656 #
1657 # log_writer_is_active()
1658 #     Returns a true value if there is currently a live arv-put
1659 #     process, false otherwise.
1660 #
1661 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1662     $log_pipe_pid);
1663
1664 sub log_writer_start($)
1665 {
1666   my $logfilename = shift;
1667   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1668                         'arv-put',
1669                         '--stream',
1670                         '--retries', '3',
1671                         '--filename', $logfilename,
1672                         '-');
1673   $log_pipe_out_buf = "";
1674   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1675 }
1676
1677 sub log_writer_read_output {
1678   my $timeout = shift || 0.01;
1679   my $read = -1;
1680   while ($read && $log_pipe_out_select->can_read($timeout)) {
1681     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1682                  length($log_pipe_out_buf));
1683   }
1684   if (!defined($read)) {
1685     Log(undef, "error reading log manifest from arv-put: $!");
1686   }
1687   return $read;
1688 }
1689
1690 sub log_writer_send($)
1691 {
1692   my $txt = shift;
1693   print $log_pipe_in $txt;
1694   log_writer_read_output();
1695 }
1696
1697 sub log_writer_finish()
1698 {
1699   return unless $log_pipe_pid;
1700
1701   close($log_pipe_in);
1702
1703   my $logger_failed = 0;
1704   my $read_result = log_writer_read_output(120);
1705   if ($read_result == -1) {
1706     $logger_failed = -1;
1707     Log (undef, "timed out reading from 'arv-put'");
1708   } elsif ($read_result != 0) {
1709     $logger_failed = -2;
1710     Log(undef, "failed to read arv-put log manifest to EOF");
1711   }
1712
1713   waitpid($log_pipe_pid, 0);
1714   if ($?) {
1715     $logger_failed ||= $?;
1716     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1717   }
1718
1719   close($log_pipe_out);
1720   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1721   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1722       $log_pipe_out_select = undef;
1723
1724   return $arv_put_output;
1725 }
1726
1727 sub log_writer_is_active() {
1728   return $log_pipe_pid;
1729 }
1730
1731 sub Log                         # ($jobstepidx, $logmessage)
1732 {
1733   my ($jobstepidx, $logmessage) = @_;
1734   if ($logmessage =~ /\n/) {
1735     for my $line (split (/\n/, $_[1])) {
1736       Log ($jobstepidx, $line);
1737     }
1738     return;
1739   }
1740   my $fh = select STDERR; $|=1; select $fh;
1741   my $task_qseq = '';
1742   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1743     $task_qseq = $jobstepidx;
1744   }
1745   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1746   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1747   $message .= "\n";
1748   my $datetime;
1749   if (log_writer_is_active() || -t STDERR) {
1750     my @gmtime = gmtime;
1751     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1752                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1753   }
1754   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1755
1756   if (log_writer_is_active()) {
1757     log_writer_send($datetime . " " . $message);
1758   }
1759 }
1760
1761
1762 sub croak
1763 {
1764   my ($package, $file, $line) = caller;
1765   my $message = "@_ at $file line $line\n";
1766   Log (undef, $message);
1767   freeze() if @jobstep_todo;
1768   create_output_collection() if @jobstep_todo;
1769   cleanup();
1770   save_meta();
1771   die;
1772 }
1773
1774
1775 sub cleanup
1776 {
1777   return unless $Job;
1778   if ($Job->{'state'} eq 'Cancelled') {
1779     $Job->update_attributes('finished_at' => scalar gmtime);
1780   } else {
1781     $Job->update_attributes('state' => 'Failed');
1782   }
1783 }
1784
1785
1786 sub save_meta
1787 {
1788   my $justcheckpoint = shift; # false if this will be the last meta saved
1789   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1790   return unless log_writer_is_active();
1791   my $log_manifest = log_writer_finish();
1792   return unless defined($log_manifest);
1793
1794   if ($Job->{log}) {
1795     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1796     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1797   }
1798
1799   my $log_coll = api_call(
1800     "collections/create", ensure_unique_name => 1, collection => {
1801       manifest_text => $log_manifest,
1802       owner_uuid => $Job->{owner_uuid},
1803       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1804     });
1805   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1806   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1807 }
1808
1809
1810 sub freeze_if_want_freeze
1811 {
1812   if ($main::please_freeze)
1813   {
1814     release_allocation();
1815     if (@_)
1816     {
1817       # kill some srun procs before freeze+stop
1818       map { $proc{$_} = {} } @_;
1819       while (%proc)
1820       {
1821         killem (keys %proc);
1822         select (undef, undef, undef, 0.1);
1823         my $died;
1824         while (($died = waitpid (-1, WNOHANG)) > 0)
1825         {
1826           delete $proc{$died};
1827         }
1828       }
1829     }
1830     freeze();
1831     create_output_collection();
1832     cleanup();
1833     save_meta();
1834     exit 1;
1835   }
1836 }
1837
1838
1839 sub freeze
1840 {
1841   Log (undef, "Freeze not implemented");
1842   return;
1843 }
1844
1845
1846 sub thaw
1847 {
1848   croak ("Thaw not implemented");
1849 }
1850
1851
1852 sub freezequote
1853 {
1854   my $s = shift;
1855   $s =~ s/\\/\\\\/g;
1856   $s =~ s/\n/\\n/g;
1857   return $s;
1858 }
1859
1860
1861 sub freezeunquote
1862 {
1863   my $s = shift;
1864   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1865   return $s;
1866 }
1867
1868
1869 sub srun_sync
1870 {
1871   my $srunargs = shift;
1872   my $execargs = shift;
1873   my $opts = shift || {};
1874   my $stdin = shift;
1875
1876   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1877   Log (undef, "$label: start");
1878
1879   my ($stderr_r, $stderr_w);
1880   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1881
1882   my ($stdout_r, $stdout_w);
1883   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1884
1885   my $srunpid = fork();
1886   if ($srunpid == 0)
1887   {
1888     close($stderr_r);
1889     close($stdout_r);
1890     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1891     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1892     open(STDERR, ">&", $stderr_w);
1893     open(STDOUT, ">&", $stdout_w);
1894     srun ($srunargs, $execargs, $opts, $stdin);
1895     exit (1);
1896   }
1897   close($stderr_w);
1898   close($stdout_w);
1899
1900   set_nonblocking($stderr_r);
1901   set_nonblocking($stdout_r);
1902
1903   # Add entries to @jobstep and %proc so check_squeue() and
1904   # freeze_if_want_freeze() can treat it like a job task process.
1905   push @jobstep, {
1906     stderr => '',
1907     stderr_at => 0,
1908     stderr_captured => '',
1909     stdout_r => $stdout_r,
1910     stdout_captured => '',
1911   };
1912   my $jobstepidx = $#jobstep;
1913   $proc{$srunpid} = {
1914     jobstepidx => $jobstepidx,
1915   };
1916   $reader{$jobstepidx} = $stderr_r;
1917
1918   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1919     my $busy = readfrompipes();
1920     if (!$busy || ($latest_refresh + 2 < scalar time)) {
1921       check_refresh_wanted();
1922       check_squeue();
1923     }
1924     if (!$busy) {
1925       select(undef, undef, undef, 0.1);
1926     }
1927     killem(keys %proc) if $main::please_freeze;
1928   }
1929   my $exited = $?;
1930
1931   1 while readfrompipes();
1932   process_stderr_final ($jobstepidx);
1933
1934   Log (undef, "$label: exit ".exit_status_s($exited));
1935
1936   close($stdout_r);
1937   close($stderr_r);
1938   delete $proc{$srunpid};
1939   delete $reader{$jobstepidx};
1940
1941   my $j = pop @jobstep;
1942   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1943 }
1944
1945
1946 sub srun
1947 {
1948   my $srunargs = shift;
1949   my $execargs = shift;
1950   my $opts = shift || {};
1951   my $stdin = shift;
1952   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1953
1954   $Data::Dumper::Terse = 1;
1955   $Data::Dumper::Indent = 0;
1956   my $show_cmd = Dumper($args);
1957   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1958   $show_cmd =~ s/\n/ /g;
1959   if ($opts->{fork}) {
1960     Log(undef, "starting: $show_cmd");
1961   } else {
1962     # This is a child process: parent is in charge of reading our
1963     # stderr and copying it to Log() if needed.
1964     warn "starting: $show_cmd\n";
1965   }
1966
1967   if (defined $stdin) {
1968     my $child = open STDIN, "-|";
1969     defined $child or die "no fork: $!";
1970     if ($child == 0) {
1971       print $stdin or die $!;
1972       close STDOUT or die $!;
1973       exit 0;
1974     }
1975   }
1976
1977   return system (@$args) if $opts->{fork};
1978
1979   exec @$args;
1980   warn "ENV size is ".length(join(" ",%ENV));
1981   die "exec failed: $!: @$args";
1982 }
1983
1984
1985 sub ban_node_by_slot {
1986   # Don't start any new jobsteps on this node for 60 seconds
1987   my $slotid = shift;
1988   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1989   $slot[$slotid]->{node}->{hold_count}++;
1990   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1991 }
1992
1993 sub must_lock_now
1994 {
1995   my ($lockfile, $error_message) = @_;
1996   open L, ">", $lockfile or croak("$lockfile: $!");
1997   if (!flock L, LOCK_EX|LOCK_NB) {
1998     croak("Can't lock $lockfile: $error_message\n");
1999   }
2000 }
2001
2002 sub find_docker_image {
2003   # Given a Keep locator, check to see if it contains a Docker image.
2004   # If so, return its stream name and Docker hash.
2005   # If not, return undef for both values.
2006   my $locator = shift;
2007   my ($streamname, $filename);
2008   my $image = api_call("collections/get", uuid => $locator);
2009   if ($image) {
2010     foreach my $line (split(/\n/, $image->{manifest_text})) {
2011       my @tokens = split(/\s+/, $line);
2012       next if (!@tokens);
2013       $streamname = shift(@tokens);
2014       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2015         if (defined($filename)) {
2016           return (undef, undef);  # More than one file in the Collection.
2017         } else {
2018           $filename = (split(/:/, $filedata, 3))[2];
2019         }
2020       }
2021     }
2022   }
2023   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2024     return ($streamname, $1);
2025   } else {
2026     return (undef, undef);
2027   }
2028 }
2029
2030 sub retry_count {
2031   # Calculate the number of times an operation should be retried,
2032   # assuming exponential backoff, and that we're willing to retry as
2033   # long as tasks have been running.  Enforce a minimum of 3 retries.
2034   my ($starttime, $endtime, $timediff, $retries);
2035   if (@jobstep) {
2036     $starttime = $jobstep[0]->{starttime};
2037     $endtime = $jobstep[-1]->{finishtime};
2038   }
2039   if (!defined($starttime)) {
2040     $timediff = 0;
2041   } elsif (!defined($endtime)) {
2042     $timediff = time - $starttime;
2043   } else {
2044     $timediff = ($endtime - $starttime) - (time - $endtime);
2045   }
2046   if ($timediff > 0) {
2047     $retries = int(log($timediff) / log(2));
2048   } else {
2049     $retries = 1;  # Use the minimum.
2050   }
2051   return ($retries > 3) ? $retries : 3;
2052 }
2053
2054 sub retry_op {
2055   # Pass in two function references.
2056   # This method will be called with the remaining arguments.
2057   # If it dies, retry it with exponential backoff until it succeeds,
2058   # or until the current retry_count is exhausted.  After each failure
2059   # that can be retried, the second function will be called with
2060   # the current try count (0-based), next try time, and error message.
2061   my $operation = shift;
2062   my $retry_callback = shift;
2063   my $retries = retry_count();
2064   foreach my $try_count (0..$retries) {
2065     my $next_try = time + (2 ** $try_count);
2066     my $result = eval { $operation->(@_); };
2067     if (!$@) {
2068       return $result;
2069     } elsif ($try_count < $retries) {
2070       $retry_callback->($try_count, $next_try, $@);
2071       my $sleep_time = $next_try - time;
2072       sleep($sleep_time) if ($sleep_time > 0);
2073     }
2074   }
2075   # Ensure the error message ends in a newline, so Perl doesn't add
2076   # retry_op's line number to it.
2077   chomp($@);
2078   die($@ . "\n");
2079 }
2080
2081 sub api_call {
2082   # Pass in a /-separated API method name, and arguments for it.
2083   # This function will call that method, retrying as needed until
2084   # the current retry_count is exhausted, with a log on the first failure.
2085   my $method_name = shift;
2086   my $log_api_retry = sub {
2087     my ($try_count, $next_try_at, $errmsg) = @_;
2088     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2089     $errmsg =~ s/\s/ /g;
2090     $errmsg =~ s/\s+$//;
2091     my $retry_msg;
2092     if ($next_try_at < time) {
2093       $retry_msg = "Retrying.";
2094     } else {
2095       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2096       $retry_msg = "Retrying at $next_try_fmt.";
2097     }
2098     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2099   };
2100   my $method = $arv;
2101   foreach my $key (split(/\//, $method_name)) {
2102     $method = $method->{$key};
2103   }
2104   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2105 }
2106
2107 sub exit_status_s {
2108   # Given a $?, return a human-readable exit code string like "0" or
2109   # "1" or "0 with signal 1" or "1 with signal 11".
2110   my $exitcode = shift;
2111   my $s = $exitcode >> 8;
2112   if ($exitcode & 0x7f) {
2113     $s .= " with signal " . ($exitcode & 0x7f);
2114   }
2115   if ($exitcode & 0x80) {
2116     $s .= " with core dump";
2117   }
2118   return $s;
2119 }
2120
2121 sub handle_readall {
2122   # Pass in a glob reference to a file handle.
2123   # Read all its contents and return them as a string.
2124   my $fh_glob_ref = shift;
2125   local $/ = undef;
2126   return <$fh_glob_ref>;
2127 }
2128
2129 sub tar_filename_n {
2130   my $n = shift;
2131   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2132 }
2133
2134 sub add_git_archive {
2135   # Pass in a git archive command as a string or list, a la system().
2136   # This method will save its output to be included in the archive sent to the
2137   # build script.
2138   my $git_input;
2139   $git_tar_count++;
2140   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2141     croak("Failed to save git archive: $!");
2142   }
2143   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2144   close($git_input);
2145   waitpid($git_pid, 0);
2146   close(GIT_ARCHIVE);
2147   if ($?) {
2148     croak("Failed to save git archive: git exited " . exit_status_s($?));
2149   }
2150 }
2151
2152 sub combined_git_archive {
2153   # Combine all saved tar archives into a single archive, then return its
2154   # contents in a string.  Return undef if no archives have been saved.
2155   if ($git_tar_count < 1) {
2156     return undef;
2157   }
2158   my $base_tar_name = tar_filename_n(1);
2159   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2160     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2161     if ($tar_exit != 0) {
2162       croak("Error preparing build archive: tar -A exited " .
2163             exit_status_s($tar_exit));
2164     }
2165   }
2166   if (!open(GIT_TAR, "<", $base_tar_name)) {
2167     croak("Could not open build archive: $!");
2168   }
2169   my $tar_contents = handle_readall(\*GIT_TAR);
2170   close(GIT_TAR);
2171   return $tar_contents;
2172 }
2173
2174 sub set_nonblocking {
2175   my $fh = shift;
2176   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2177   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2178 }
2179
2180 __DATA__
2181 #!/usr/bin/env perl
2182 #
2183 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2184 # server invokes this script on individual compute nodes, or localhost if we're
2185 # running a job locally.  It gets called in two modes:
2186 #
2187 # * No arguments: Installation mode.  Read a tar archive from the DATA
2188 #   file handle; it includes the Crunch script's source code, and
2189 #   maybe SDKs as well.  Those should be installed in the proper
2190 #   locations.  This runs outside of any Docker container, so don't try to
2191 #   introspect Crunch's runtime environment.
2192 #
2193 # * With arguments: Crunch script run mode.  This script should set up the
2194 #   environment, then run the command specified in the arguments.  This runs
2195 #   inside any Docker container.
2196
2197 use Fcntl ':flock';
2198 use File::Path qw( make_path remove_tree );
2199 use POSIX qw(getcwd);
2200
2201 use constant TASK_TEMPFAIL => 111;
2202
2203 # Map SDK subdirectories to the path environments they belong to.
2204 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2205
2206 my $destdir = $ENV{"CRUNCH_SRC"};
2207 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2208 my $repo = $ENV{"CRUNCH_SRC_URL"};
2209 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2210 my $job_work = $ENV{"JOB_WORK"};
2211 my $task_work = $ENV{"TASK_WORK"};
2212
2213 open(STDOUT_ORIG, ">&", STDOUT);
2214 open(STDERR_ORIG, ">&", STDERR);
2215
2216 for my $dir ($destdir, $job_work, $task_work) {
2217   if ($dir) {
2218     make_path $dir;
2219     -e $dir or die "Failed to create temporary directory ($dir): $!";
2220   }
2221 }
2222
2223 if ($task_work) {
2224   remove_tree($task_work, {keep_root => 1});
2225 }
2226
2227 ### Crunch script run mode
2228 if (@ARGV) {
2229   # We want to do routine logging during task 0 only.  This gives the user
2230   # the information they need, but avoids repeating the information for every
2231   # task.
2232   my $Log;
2233   if ($ENV{TASK_SEQUENCE} eq "0") {
2234     $Log = sub {
2235       my $msg = shift;
2236       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2237     };
2238   } else {
2239     $Log = sub { };
2240   }
2241
2242   my $python_src = "$install_dir/python";
2243   my $venv_dir = "$job_work/.arvados.venv";
2244   my $venv_built = -e "$venv_dir/bin/activate";
2245   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2246     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2247                  "--python=python2.7", $venv_dir);
2248     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2249     $venv_built = 1;
2250     $Log->("Built Python SDK virtualenv");
2251   }
2252
2253   my @pysdk_version_cmd = ("python", "-c",
2254     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2255   if ($venv_built) {
2256     $Log->("Running in Python SDK virtualenv");
2257     @pysdk_version_cmd = ();
2258     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2259     @ARGV = ("/bin/sh", "-ec",
2260              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2261   } elsif (-d $python_src) {
2262     $Log->("Warning: virtualenv not found inside Docker container default " .
2263            "\$PATH. Can't install Python SDK.");
2264   }
2265
2266   if (@pysdk_version_cmd) {
2267     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2268     my $pysdk_version = <$pysdk_version_pipe>;
2269     close($pysdk_version_pipe);
2270     if ($? == 0) {
2271       chomp($pysdk_version);
2272       $Log->("Using Arvados SDK version $pysdk_version");
2273     } else {
2274       # A lot could've gone wrong here, but pretty much all of it means that
2275       # Python won't be able to load the Arvados SDK.
2276       $Log->("Warning: Arvados SDK not found");
2277     }
2278   }
2279
2280   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2281     my $sdk_path = "$install_dir/$sdk_dir";
2282     if (-d $sdk_path) {
2283       if ($ENV{$sdk_envkey}) {
2284         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2285       } else {
2286         $ENV{$sdk_envkey} = $sdk_path;
2287       }
2288       $Log->("Arvados SDK added to %s", $sdk_envkey);
2289     }
2290   }
2291
2292   exec(@ARGV);
2293   die "Cannot exec `@ARGV`: $!";
2294 }
2295
2296 ### Installation mode
2297 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2298 flock L, LOCK_EX;
2299 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2300   # This exact git archive (source + arvados sdk) is already installed
2301   # here, so there's no need to reinstall it.
2302
2303   # We must consume our DATA section, though: otherwise the process
2304   # feeding it to us will get SIGPIPE.
2305   my $buf;
2306   while (read(DATA, $buf, 65536)) { }
2307
2308   exit(0);
2309 }
2310
2311 unlink "$destdir.archive_hash";
2312 mkdir $destdir;
2313
2314 do {
2315   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2316   local $SIG{PIPE} = "IGNORE";
2317   warn "Extracting archive: $archive_hash\n";
2318   # --ignore-zeros is necessary sometimes: depending on how much NUL
2319   # padding tar -A put on our combined archive (which in turn depends
2320   # on the length of the component archives) tar without
2321   # --ignore-zeros will exit before consuming stdin and cause close()
2322   # to fail on the resulting SIGPIPE.
2323   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2324     die "Error launching 'tar -xC $destdir': $!";
2325   }
2326   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2327   # get SIGPIPE.  We must feed it data incrementally.
2328   my $tar_input;
2329   while (read(DATA, $tar_input, 65536)) {
2330     print TARX $tar_input;
2331   }
2332   if(!close(TARX)) {
2333     die "'tar -xC $destdir' exited $?: $!";
2334   }
2335 };
2336
2337 mkdir $install_dir;
2338
2339 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2340 if (-d $sdk_root) {
2341   foreach my $sdk_lang (("python",
2342                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2343     if (-d "$sdk_root/$sdk_lang") {
2344       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2345         die "Failed to install $sdk_lang SDK: $!";
2346       }
2347     }
2348   }
2349 }
2350
2351 my $python_dir = "$install_dir/python";
2352 if ((-d $python_dir) and can_run("python2.7")) {
2353   open(my $egg_info_pipe, "-|",
2354        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2355   my @egg_info_errors = <$egg_info_pipe>;
2356   close($egg_info_pipe);
2357
2358   if ($?) {
2359     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2360       # egg_info apparently failed because it couldn't ask git for a build tag.
2361       # Specify no build tag.
2362       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2363       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2364       close($pysdk_cfg);
2365     } else {
2366       my $egg_info_exit = $? >> 8;
2367       foreach my $errline (@egg_info_errors) {
2368         warn $errline;
2369       }
2370       warn "python setup.py egg_info failed: exit $egg_info_exit";
2371       exit ($egg_info_exit || 1);
2372     }
2373   }
2374 }
2375
2376 # Hide messages from the install script (unless it fails: shell_or_die
2377 # will show $destdir.log in that case).
2378 open(STDOUT, ">>", "$destdir.log");
2379 open(STDERR, ">&", STDOUT);
2380
2381 if (-e "$destdir/crunch_scripts/install") {
2382     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2383 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2384     # Old version
2385     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2386 } elsif (-e "./install.sh") {
2387     shell_or_die (undef, "./install.sh", $install_dir);
2388 }
2389
2390 if ($archive_hash) {
2391     unlink "$destdir.archive_hash.new";
2392     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2393     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2394 }
2395
2396 close L;
2397
2398 sub can_run {
2399   my $command_name = shift;
2400   open(my $which, "-|", "which", $command_name);
2401   while (<$which>) { }
2402   close($which);
2403   return ($? == 0);
2404 }
2405
2406 sub shell_or_die
2407 {
2408   my $exitcode = shift;
2409
2410   if ($ENV{"DEBUG"}) {
2411     print STDERR "@_\n";
2412   }
2413   if (system (@_) != 0) {
2414     my $err = $!;
2415     my $code = $?;
2416     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2417     open STDERR, ">&STDERR_ORIG";
2418     system ("cat $destdir.log >&2");
2419     warn "@_ failed ($err): $exitstatus";
2420     if (defined($exitcode)) {
2421       exit $exitcode;
2422     }
2423     else {
2424       exit (($code >> 8) || 1);
2425     }
2426   }
2427 }
2428
2429 __DATA__