4410: Crunch retries jobs when all SLURM nodes fail.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "/usr/bin/docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $Job = JSON::decode_json($jobspec);
173   $local_job = 1;
174 }
175
176
177 # Make sure our workers (our slurm nodes, localhost, or whatever) are
178 # at least able to run basic commands: they aren't down or severely
179 # misconfigured.
180 my $cmd = ['true'];
181 if ($Job->{docker_image_locator}) {
182   $cmd = [$docker_bin, 'ps', '-q'];
183 }
184 Log(undef, "Sanity check is `@$cmd`");
185 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
186      $cmd,
187      {fork => 1});
188 if ($? != 0) {
189   Log(undef, "Sanity check failed: ".exit_status_s($?));
190   exit EX_TEMPFAIL;
191 }
192 Log(undef, "Sanity check OK");
193
194
195 my $User = api_call("users/current");
196
197 if (!$local_job) {
198   if (!$force_unlock) {
199     # Claim this job, and make sure nobody else does
200     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
201     if ($@) {
202       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
203       exit EX_TEMPFAIL;
204     };
205   }
206 }
207 else
208 {
209   if (!$resume_stash)
210   {
211     map { croak ("No $_ specified") unless $Job->{$_} }
212     qw(script script_version script_parameters);
213   }
214
215   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
216   $Job->{'started_at'} = gmtime;
217   $Job->{'state'} = 'Running';
218
219   $Job = api_call("jobs/create", job => $Job);
220 }
221 $job_id = $Job->{'uuid'};
222
223 my $keep_logfile = $job_id . '.log.txt';
224 log_writer_start($keep_logfile);
225
226 $Job->{'runtime_constraints'} ||= {};
227 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
228 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
229
230 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
231 if ($? == 0) {
232   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
233   chomp($gem_versions);
234   chop($gem_versions);  # Closing parentheses
235 } else {
236   $gem_versions = "";
237 }
238 Log(undef,
239     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
240
241 Log (undef, "check slurm allocation");
242 my @slot;
243 my @node;
244 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
245 my @sinfo;
246 if (!$have_slurm)
247 {
248   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
249   push @sinfo, "$localcpus localhost";
250 }
251 if (exists $ENV{SLURM_NODELIST})
252 {
253   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
254 }
255 foreach (@sinfo)
256 {
257   my ($ncpus, $slurm_nodelist) = split;
258   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
259
260   my @nodelist;
261   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
262   {
263     my $nodelist = $1;
264     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
265     {
266       my $ranges = $1;
267       foreach (split (",", $ranges))
268       {
269         my ($a, $b);
270         if (/(\d+)-(\d+)/)
271         {
272           $a = $1;
273           $b = $2;
274         }
275         else
276         {
277           $a = $_;
278           $b = $_;
279         }
280         push @nodelist, map {
281           my $n = $nodelist;
282           $n =~ s/\[[-,\d]+\]/$_/;
283           $n;
284         } ($a..$b);
285       }
286     }
287     else
288     {
289       push @nodelist, $nodelist;
290     }
291   }
292   foreach my $nodename (@nodelist)
293   {
294     Log (undef, "node $nodename - $ncpus slots");
295     my $node = { name => $nodename,
296                  ncpus => $ncpus,
297                  losing_streak => 0,
298                  hold_until => 0 };
299     foreach my $cpu (1..$ncpus)
300     {
301       push @slot, { node => $node,
302                     cpu => $cpu };
303     }
304   }
305   push @node, @nodelist;
306 }
307
308
309
310 # Ensure that we get one jobstep running on each allocated node before
311 # we start overloading nodes with concurrent steps
312
313 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
314
315
316 $Job->update_attributes(
317   'tasks_summary' => { 'failed' => 0,
318                        'todo' => 1,
319                        'running' => 0,
320                        'done' => 0 });
321
322 Log (undef, "start");
323 $SIG{'INT'} = sub { $main::please_freeze = 1; };
324 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
325 $SIG{'TERM'} = \&croak;
326 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
327 $SIG{'ALRM'} = sub { $main::please_info = 1; };
328 $SIG{'CONT'} = sub { $main::please_continue = 1; };
329 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
330
331 $main::please_freeze = 0;
332 $main::please_info = 0;
333 $main::please_continue = 0;
334 $main::please_refresh = 0;
335 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
336
337 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
338 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
339 $ENV{"JOB_UUID"} = $job_id;
340
341
342 my @jobstep_todo = ();
343 my @jobstep_done = ();
344 my @jobstep_tomerge = ();
345 my $jobstep_tomerge_level = 0;
346 my $squeue_checked = 0;
347 my $latest_refresh = scalar time;
348
349
350
351 if (defined $Job->{thawedfromkey})
352 {
353   thaw ($Job->{thawedfromkey});
354 }
355 else
356 {
357   my $first_task = api_call("job_tasks/create", job_task => {
358     'job_uuid' => $Job->{'uuid'},
359     'sequence' => 0,
360     'qsequence' => 0,
361     'parameters' => {},
362   });
363   push @jobstep, { 'level' => 0,
364                    'failures' => 0,
365                    'arvados_task' => $first_task,
366                  };
367   push @jobstep_todo, 0;
368 }
369
370
371 if (!$have_slurm)
372 {
373   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
374 }
375
376 my $build_script = handle_readall(\*DATA);
377 my $nodelist = join(",", @node);
378 my $git_tar_count = 0;
379
380 if (!defined $no_clear_tmp) {
381   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382   Log (undef, "Clean work dirs");
383
384   my $cleanpid = fork();
385   if ($cleanpid == 0)
386   {
387     # Find FUSE mounts that look like Keep mounts (the mount path has the
388     # word "keep") and unmount them.  Then clean up work directories.
389     # TODO: When #5036 is done and widely deployed, we can get rid of the
390     # regular expression and just unmount everything with type fuse.keep.
391     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
392           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
393     exit (1);
394   }
395   while (1)
396   {
397     last if $cleanpid == waitpid (-1, WNOHANG);
398     freeze_if_want_freeze ($cleanpid);
399     select (undef, undef, undef, 0.1);
400   }
401   Log (undef, "Cleanup command exited ".exit_status_s($?));
402 }
403
404 # If this job requires a Docker image, install that.
405 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
406 if ($docker_locator = $Job->{docker_image_locator}) {
407   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
408   if (!$docker_hash)
409   {
410     croak("No Docker image hash found from locator $docker_locator");
411   }
412   $docker_stream =~ s/^\.//;
413   my $docker_install_script = qq{
414 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
415     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
416 fi
417 };
418   my $docker_pid = fork();
419   if ($docker_pid == 0)
420   {
421     srun (["srun", "--nodelist=" . join(',', @node)],
422           ["/bin/sh", "-ec", $docker_install_script]);
423     exit ($?);
424   }
425   while (1)
426   {
427     last if $docker_pid == waitpid (-1, WNOHANG);
428     freeze_if_want_freeze ($docker_pid);
429     select (undef, undef, undef, 0.1);
430   }
431   if ($? != 0)
432   {
433     croak("Installing Docker image from $docker_locator exited "
434           .exit_status_s($?));
435   }
436
437   # Determine whether this version of Docker supports memory+swap limits.
438   srun(["srun", "--nodelist=" . $node[0]],
439        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
440       {fork => 1});
441   $docker_limitmem = ($? == 0);
442
443   if ($Job->{arvados_sdk_version}) {
444     # The job also specifies an Arvados SDK version.  Add the SDKs to the
445     # tar file for the build script to install.
446     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
447                        $Job->{arvados_sdk_version}));
448     add_git_archive("git", "--git-dir=$git_dir", "archive",
449                     "--prefix=.arvados.sdk/",
450                     $Job->{arvados_sdk_version}, "sdk");
451   }
452 }
453
454 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
455   # If script_version looks like an absolute path, *and* the --git-dir
456   # argument was not given -- which implies we were not invoked by
457   # crunch-dispatch -- we will use the given path as a working
458   # directory instead of resolving script_version to a git commit (or
459   # doing anything else with git).
460   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
461   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
462 }
463 else {
464   # Resolve the given script_version to a git commit sha1. Also, if
465   # the repository is remote, clone it into our local filesystem: this
466   # ensures "git archive" will work, and is necessary to reliably
467   # resolve a symbolic script_version like "master^".
468   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
469
470   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
471
472   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
473
474   # If we're running under crunch-dispatch, it will have already
475   # pulled the appropriate source tree into its own repository, and
476   # given us that repo's path as $git_dir.
477   #
478   # If we're running a "local" job, we might have to fetch content
479   # from a remote repository.
480   #
481   # (Currently crunch-dispatch gives a local path with --git-dir, but
482   # we might as well accept URLs there too in case it changes its
483   # mind.)
484   my $repo = $git_dir || $Job->{'repository'};
485
486   # Repository can be remote or local. If remote, we'll need to fetch it
487   # to a local dir before doing `git log` et al.
488   my $repo_location;
489
490   if ($repo =~ m{://|^[^/]*:}) {
491     # $repo is a git url we can clone, like git:// or https:// or
492     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
493     # not recognized here because distinguishing that from a local
494     # path is too fragile. If you really need something strange here,
495     # use the ssh:// form.
496     $repo_location = 'remote';
497   } elsif ($repo =~ m{^\.*/}) {
498     # $repo is a local path to a git index. We'll also resolve ../foo
499     # to ../foo/.git if the latter is a directory. To help
500     # disambiguate local paths from named hosted repositories, this
501     # form must be given as ./ or ../ if it's a relative path.
502     if (-d "$repo/.git") {
503       $repo = "$repo/.git";
504     }
505     $repo_location = 'local';
506   } else {
507     # $repo is none of the above. It must be the name of a hosted
508     # repository.
509     my $arv_repo_list = api_call("repositories/list",
510                                  'filters' => [['name','=',$repo]]);
511     my @repos_found = @{$arv_repo_list->{'items'}};
512     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
513     if ($n_found > 0) {
514       Log(undef, "Repository '$repo' -> "
515           . join(", ", map { $_->{'uuid'} } @repos_found));
516     }
517     if ($n_found != 1) {
518       croak("Error: Found $n_found repositories with name '$repo'.");
519     }
520     $repo = $repos_found[0]->{'fetch_url'};
521     $repo_location = 'remote';
522   }
523   Log(undef, "Using $repo_location repository '$repo'");
524   $ENV{"CRUNCH_SRC_URL"} = $repo;
525
526   # Resolve given script_version (we'll call that $treeish here) to a
527   # commit sha1 ($commit).
528   my $treeish = $Job->{'script_version'};
529   my $commit;
530   if ($repo_location eq 'remote') {
531     # We minimize excess object-fetching by re-using the same bare
532     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
533     # just keep adding remotes to it as needed.
534     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
535     my $gitcmd = "git --git-dir=\Q$local_repo\E";
536
537     # Set up our local repo for caching remote objects, making
538     # archives, etc.
539     if (!-d $local_repo) {
540       make_path($local_repo) or croak("Error: could not create $local_repo");
541     }
542     # This works (exits 0 and doesn't delete fetched objects) even
543     # if $local_repo is already initialized:
544     `$gitcmd init --bare`;
545     if ($?) {
546       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
547     }
548
549     # If $treeish looks like a hash (or abbrev hash) we look it up in
550     # our local cache first, since that's cheaper. (We don't want to
551     # do that with tags/branches though -- those change over time, so
552     # they should always be resolved by the remote repo.)
553     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
554       # Hide stderr because it's normal for this to fail:
555       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
556       if ($? == 0 &&
557           # Careful not to resolve a branch named abcdeff to commit 1234567:
558           $sha1 =~ /^$treeish/ &&
559           $sha1 =~ /^([0-9a-f]{40})$/s) {
560         $commit = $1;
561         Log(undef, "Commit $commit already present in $local_repo");
562       }
563     }
564
565     if (!defined $commit) {
566       # If $treeish isn't just a hash or abbrev hash, or isn't here
567       # yet, we need to fetch the remote to resolve it correctly.
568
569       # First, remove all local heads. This prevents a name that does
570       # not exist on the remote from resolving to (or colliding with)
571       # a previously fetched branch or tag (possibly from a different
572       # remote).
573       remove_tree("$local_repo/refs/heads", {keep_root => 1});
574
575       Log(undef, "Fetching objects from $repo to $local_repo");
576       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
577       if ($?) {
578         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
579       }
580     }
581
582     # Now that the data is all here, we will use our local repo for
583     # the rest of our git activities.
584     $repo = $local_repo;
585   }
586
587   my $gitcmd = "git --git-dir=\Q$repo\E";
588   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
589   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
590     croak("`$gitcmd rev-list` exited "
591           .exit_status_s($?)
592           .", '$treeish' not found. Giving up.");
593   }
594   $commit = $1;
595   Log(undef, "Version $treeish is commit $commit");
596
597   if ($commit ne $Job->{'script_version'}) {
598     # Record the real commit id in the database, frozentokey, logs,
599     # etc. -- instead of an abbreviation or a branch name which can
600     # become ambiguous or point to a different commit in the future.
601     if (!$Job->update_attributes('script_version' => $commit)) {
602       croak("Error: failed to update job's script_version attribute");
603     }
604   }
605
606   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
607   add_git_archive("$gitcmd archive ''\Q$commit\E");
608 }
609
610 my $git_archive = combined_git_archive();
611 if (!defined $git_archive) {
612   Log(undef, "Skip install phase (no git archive)");
613   if ($have_slurm) {
614     Log(undef, "Warning: This probably means workers have no source tree!");
615   }
616 }
617 else {
618   my $install_exited;
619   my $install_script_tries_left = 3;
620   for (my $attempts = 0; $attempts < 3; $attempts++) {
621     Log(undef, "Run install script on all workers");
622
623     my @srunargs = ("srun",
624                     "--nodelist=$nodelist",
625                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
626     my @execargs = ("sh", "-c",
627                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
628
629     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
630     my ($install_stderr_r, $install_stderr_w);
631     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
632     set_nonblocking($install_stderr_r);
633     my $installpid = fork();
634     if ($installpid == 0)
635     {
636       close($install_stderr_r);
637       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
638       open(STDOUT, ">&", $install_stderr_w);
639       open(STDERR, ">&", $install_stderr_w);
640       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
641       exit (1);
642     }
643     close($install_stderr_w);
644     # Tell freeze_if_want_freeze how to kill the child, otherwise the
645     # "waitpid(installpid)" loop won't get interrupted by a freeze:
646     $proc{$installpid} = {};
647     my $stderr_buf = '';
648     # Track whether anything appears on stderr other than slurm errors
649     # ("srun: ...") and the "starting: ..." message printed by the
650     # srun subroutine itself:
651     my $stderr_anything_from_script = 0;
652     my $match_our_own_errors = '^(srun: error: |starting: \[)';
653     while ($installpid != waitpid(-1, WNOHANG)) {
654       freeze_if_want_freeze ($installpid);
655       # Wait up to 0.1 seconds for something to appear on stderr, then
656       # do a non-blocking read.
657       my $bits = fhbits($install_stderr_r);
658       select ($bits, undef, $bits, 0.1);
659       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
660       {
661         while ($stderr_buf =~ /^(.*?)\n/) {
662           my $line = $1;
663           substr $stderr_buf, 0, 1+length($line), "";
664           Log(undef, "stderr $line");
665           if ($line !~ /$match_our_own_errors/) {
666             $stderr_anything_from_script = 1;
667           }
668         }
669       }
670     }
671     delete $proc{$installpid};
672     $install_exited = $?;
673     close($install_stderr_r);
674     if (length($stderr_buf) > 0) {
675       if ($stderr_buf !~ /$match_our_own_errors/) {
676         $stderr_anything_from_script = 1;
677       }
678       Log(undef, "stderr $stderr_buf")
679     }
680
681     Log (undef, "Install script exited ".exit_status_s($install_exited));
682     last if $install_exited == 0 || $main::please_freeze;
683     # If the install script fails but doesn't print an error message,
684     # the next thing anyone is likely to do is just run it again in
685     # case it was a transient problem like "slurm communication fails
686     # because the network isn't reliable enough". So we'll just do
687     # that ourselves (up to 3 attempts in total). OTOH, if there is an
688     # error message, the problem is more likely to have a real fix and
689     # we should fail the job so the fixing process can start, instead
690     # of doing 2 more attempts.
691     last if $stderr_anything_from_script;
692   }
693
694   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
695     unlink($tar_filename);
696   }
697
698   if ($install_exited != 0) {
699     croak("Giving up");
700   }
701 }
702
703 foreach (qw (script script_version script_parameters runtime_constraints))
704 {
705   Log (undef,
706        "$_ " .
707        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
708 }
709 foreach (split (/\n/, $Job->{knobs}))
710 {
711   Log (undef, "knob " . $_);
712 }
713
714
715
716 $main::success = undef;
717
718
719
720 ONELEVEL:
721
722 my $thisround_succeeded = 0;
723 my $thisround_failed = 0;
724 my $thisround_failed_multiple = 0;
725 my $working_slot_count = scalar(@slot);
726
727 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
728                        or $a <=> $b } @jobstep_todo;
729 my $level = $jobstep[$jobstep_todo[0]]->{level};
730
731 my $initial_tasks_this_level = 0;
732 foreach my $id (@jobstep_todo) {
733   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
734 }
735
736 # If the number of tasks scheduled at this level #T is smaller than the number
737 # of slots available #S, only use the first #T slots, or the first slot on
738 # each node, whichever number is greater.
739 #
740 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
741 # based on these numbers.  Using fewer slots makes more resources available
742 # to each individual task, which should normally be a better strategy when
743 # there are fewer of them running with less parallelism.
744 #
745 # Note that this calculation is not redone if the initial tasks at
746 # this level queue more tasks at the same level.  This may harm
747 # overall task throughput for that level.
748 my @freeslot;
749 if ($initial_tasks_this_level < @node) {
750   @freeslot = (0..$#node);
751 } elsif ($initial_tasks_this_level < @slot) {
752   @freeslot = (0..$initial_tasks_this_level - 1);
753 } else {
754   @freeslot = (0..$#slot);
755 }
756 my $round_num_freeslots = scalar(@freeslot);
757
758 my %round_max_slots = ();
759 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
760   my $this_slot = $slot[$freeslot[$ii]];
761   my $node_name = $this_slot->{node}->{name};
762   $round_max_slots{$node_name} ||= $this_slot->{cpu};
763   last if (scalar(keys(%round_max_slots)) >= @node);
764 }
765
766 Log(undef, "start level $level with $round_num_freeslots slots");
767 my @holdslot;
768 my %reader;
769 my $progress_is_dirty = 1;
770 my $progress_stats_updated = 0;
771
772 update_progress_stats();
773
774
775 THISROUND:
776 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
777 {
778   my $id = $jobstep_todo[$todo_ptr];
779   my $Jobstep = $jobstep[$id];
780   if ($Jobstep->{level} != $level)
781   {
782     next;
783   }
784
785   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
786   set_nonblocking($reader{$id});
787
788   my $childslot = $freeslot[0];
789   my $childnode = $slot[$childslot]->{node};
790   my $childslotname = join (".",
791                             $slot[$childslot]->{node}->{name},
792                             $slot[$childslot]->{cpu});
793
794   my $childpid = fork();
795   if ($childpid == 0)
796   {
797     $SIG{'INT'} = 'DEFAULT';
798     $SIG{'QUIT'} = 'DEFAULT';
799     $SIG{'TERM'} = 'DEFAULT';
800
801     foreach (values (%reader))
802     {
803       close($_);
804     }
805     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
806     open(STDOUT,">&writer");
807     open(STDERR,">&writer");
808
809     undef $dbh;
810     undef $sth;
811
812     delete $ENV{"GNUPGHOME"};
813     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
814     $ENV{"TASK_QSEQUENCE"} = $id;
815     $ENV{"TASK_SEQUENCE"} = $level;
816     $ENV{"JOB_SCRIPT"} = $Job->{script};
817     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
818       $param =~ tr/a-z/A-Z/;
819       $ENV{"JOB_PARAMETER_$param"} = $value;
820     }
821     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
822     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
823     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
824     $ENV{"HOME"} = $ENV{"TASK_WORK"};
825     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
826     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
827     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
828     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
829
830     $ENV{"GZIP"} = "-n";
831
832     my @srunargs = (
833       "srun",
834       "--nodelist=".$childnode->{name},
835       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
836       "--job-name=$job_id.$id.$$",
837         );
838     my $command =
839         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
840         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
841         ."&& cd $ENV{CRUNCH_TMP} "
842         # These environment variables get used explicitly later in
843         # $command.  No tool is expected to read these values directly.
844         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
845         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
846         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
847         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
848     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
849     if ($docker_hash)
850     {
851       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
852       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
853       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
854       # We only set memory limits if Docker lets us limit both memory and swap.
855       # Memory limits alone have been supported longer, but subprocesses tend
856       # to get SIGKILL if they exceed that without any swap limit set.
857       # See #5642 for additional background.
858       if ($docker_limitmem) {
859         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
860       }
861
862       # Dynamically configure the container to use the host system as its
863       # DNS server.  Get the host's global addresses from the ip command,
864       # and turn them into docker --dns options using gawk.
865       $command .=
866           q{$(ip -o address show scope global |
867               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
868
869       # The source tree and $destdir directory (which we have
870       # installed on the worker host) are available in the container,
871       # under the same path.
872       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
873       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
874
875       # Currently, we make arv-mount's mount point appear at /keep
876       # inside the container (instead of using the same path as the
877       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
878       # crunch scripts and utilities must not rely on this. They must
879       # use $TASK_KEEPMOUNT.
880       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
881       $ENV{TASK_KEEPMOUNT} = "/keep";
882
883       # TASK_WORK is almost exactly like a docker data volume: it
884       # starts out empty, is writable, and persists until no
885       # containers use it any more. We don't use --volumes-from to
886       # share it with other containers: it is only accessible to this
887       # task, and it goes away when this task stops.
888       #
889       # However, a docker data volume is writable only by root unless
890       # the mount point already happens to exist in the container with
891       # different permissions. Therefore, we [1] assume /tmp already
892       # exists in the image and is writable by the crunch user; [2]
893       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
894       # writable if they are created by docker while setting up the
895       # other --volumes); and [3] create $TASK_WORK inside the
896       # container using $build_script.
897       $command .= "--volume=/tmp ";
898       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
899       $ENV{"HOME"} = $ENV{"TASK_WORK"};
900       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
901
902       # TODO: Share a single JOB_WORK volume across all task
903       # containers on a given worker node, and delete it when the job
904       # ends (and, in case that doesn't work, when the next job
905       # starts).
906       #
907       # For now, use the same approach as TASK_WORK above.
908       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
909
910       while (my ($env_key, $env_val) = each %ENV)
911       {
912         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
913           $command .= "--env=\Q$env_key=$env_val\E ";
914         }
915       }
916       $command .= "--env=\QHOME=$ENV{HOME}\E ";
917       $command .= "\Q$docker_hash\E ";
918       $command .= "stdbuf --output=0 --error=0 ";
919       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
920     } else {
921       # Non-docker run
922       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
923       $command .= "stdbuf --output=0 --error=0 ";
924       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
925     }
926
927     my @execargs = ('bash', '-c', $command);
928     srun (\@srunargs, \@execargs, undef, $build_script);
929     # exec() failed, we assume nothing happened.
930     die "srun() failed on build script\n";
931   }
932   close("writer");
933   if (!defined $childpid)
934   {
935     close $reader{$id};
936     delete $reader{$id};
937     next;
938   }
939   shift @freeslot;
940   $proc{$childpid} = { jobstep => $id,
941                        time => time,
942                        slot => $childslot,
943                        jobstepname => "$job_id.$id.$childpid",
944                      };
945   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
946   $slot[$childslot]->{pid} = $childpid;
947
948   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
949   Log ($id, "child $childpid started on $childslotname");
950   $Jobstep->{starttime} = time;
951   $Jobstep->{node} = $childnode->{name};
952   $Jobstep->{slotindex} = $childslot;
953   delete $Jobstep->{stderr};
954   delete $Jobstep->{finishtime};
955
956   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
957   $Jobstep->{'arvados_task'}->save;
958
959   splice @jobstep_todo, $todo_ptr, 1;
960   --$todo_ptr;
961
962   $progress_is_dirty = 1;
963
964   while (!@freeslot
965          ||
966          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
967   {
968     last THISROUND if $main::please_freeze || defined($main::success);
969     if ($main::please_info)
970     {
971       $main::please_info = 0;
972       freeze();
973       create_output_collection();
974       save_meta(1);
975       update_progress_stats();
976     }
977     my $gotsome
978         = readfrompipes ()
979         + reapchildren ();
980     if (!$gotsome)
981     {
982       check_refresh_wanted();
983       check_squeue();
984       update_progress_stats();
985       select (undef, undef, undef, 0.1);
986     }
987     elsif (time - $progress_stats_updated >= 30)
988     {
989       update_progress_stats();
990     }
991     $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 &&
992                                         $_->{node}->{hold_count} < 4 } @slot);
993     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
994         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
995     {
996       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
997           .($thisround_failed+$thisround_succeeded)
998           .") -- giving up on this round";
999       Log (undef, $message);
1000       last THISROUND;
1001     }
1002
1003     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1004     for (my $i=$#freeslot; $i>=0; $i--) {
1005       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1006         push @holdslot, (splice @freeslot, $i, 1);
1007       }
1008     }
1009     for (my $i=$#holdslot; $i>=0; $i--) {
1010       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1011         push @freeslot, (splice @holdslot, $i, 1);
1012       }
1013     }
1014
1015     # give up if no nodes are succeeding
1016     if ($working_slot_count < 1) {
1017       Log(undef, "Every node has failed -- giving up");
1018       last THISROUND;
1019     }
1020   }
1021 }
1022
1023
1024 push @freeslot, splice @holdslot;
1025 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1026
1027
1028 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1029 while (%proc)
1030 {
1031   if ($main::please_continue) {
1032     $main::please_continue = 0;
1033     goto THISROUND;
1034   }
1035   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1036   readfrompipes ();
1037   if (!reapchildren())
1038   {
1039     check_refresh_wanted();
1040     check_squeue();
1041     update_progress_stats();
1042     select (undef, undef, undef, 0.1);
1043     killem (keys %proc) if $main::please_freeze;
1044   }
1045 }
1046
1047 update_progress_stats();
1048 freeze_if_want_freeze();
1049
1050
1051 if (!defined $main::success)
1052 {
1053   if (!@jobstep_todo) {
1054     $main::success = 1;
1055   } elsif ($working_slot_count < 1) {
1056     save_meta();
1057     exit(EX_RETRY_UNLOCKED);
1058   } elsif ($thisround_succeeded == 0 &&
1059            ($thisround_failed == 0 || $thisround_failed > 4)) {
1060     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1061     Log (undef, $message);
1062     $main::success = 0;
1063   }
1064 }
1065
1066 goto ONELEVEL if !defined $main::success;
1067
1068
1069 release_allocation();
1070 freeze();
1071 my $collated_output = &create_output_collection();
1072
1073 if (!$collated_output) {
1074   Log (undef, "Failed to write output collection");
1075 }
1076 else {
1077   Log(undef, "job output $collated_output");
1078   $Job->update_attributes('output' => $collated_output);
1079 }
1080
1081 Log (undef, "finish");
1082
1083 save_meta();
1084
1085 my $final_state;
1086 if ($collated_output && $main::success) {
1087   $final_state = 'Complete';
1088 } else {
1089   $final_state = 'Failed';
1090 }
1091 $Job->update_attributes('state' => $final_state);
1092
1093 exit (($final_state eq 'Complete') ? 0 : 1);
1094
1095
1096
1097 sub update_progress_stats
1098 {
1099   $progress_stats_updated = time;
1100   return if !$progress_is_dirty;
1101   my ($todo, $done, $running) = (scalar @jobstep_todo,
1102                                  scalar @jobstep_done,
1103                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1104   $Job->{'tasks_summary'} ||= {};
1105   $Job->{'tasks_summary'}->{'todo'} = $todo;
1106   $Job->{'tasks_summary'}->{'done'} = $done;
1107   $Job->{'tasks_summary'}->{'running'} = $running;
1108   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1109   Log (undef, "status: $done done, $running running, $todo todo");
1110   $progress_is_dirty = 0;
1111 }
1112
1113
1114
1115 sub reapchildren
1116 {
1117   my $pid = waitpid (-1, WNOHANG);
1118   return 0 if $pid <= 0;
1119
1120   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1121                   . "."
1122                   . $slot[$proc{$pid}->{slot}]->{cpu});
1123   my $jobstepid = $proc{$pid}->{jobstep};
1124   my $elapsed = time - $proc{$pid}->{time};
1125   my $Jobstep = $jobstep[$jobstepid];
1126
1127   my $childstatus = $?;
1128   my $exitvalue = $childstatus >> 8;
1129   my $exitinfo = "exit ".exit_status_s($childstatus);
1130   $Jobstep->{'arvados_task'}->reload;
1131   my $task_success = $Jobstep->{'arvados_task'}->{success};
1132
1133   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1134
1135   if (!defined $task_success) {
1136     # task did not indicate one way or the other --> fail
1137     $Jobstep->{'arvados_task'}->{success} = 0;
1138     $Jobstep->{'arvados_task'}->save;
1139     $task_success = 0;
1140   }
1141
1142   if (!$task_success)
1143   {
1144     my $temporary_fail;
1145     $temporary_fail ||= $Jobstep->{node_fail};
1146     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1147
1148     ++$thisround_failed;
1149     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1150
1151     # Check for signs of a failed or misconfigured node
1152     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1153         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1154       # Don't count this against jobstep failure thresholds if this
1155       # node is already suspected faulty and srun exited quickly
1156       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1157           $elapsed < 5) {
1158         Log ($jobstepid, "blaming failure on suspect node " .
1159              $slot[$proc{$pid}->{slot}]->{node}->{name});
1160         $temporary_fail ||= 1;
1161       }
1162       ban_node_by_slot($proc{$pid}->{slot});
1163     }
1164
1165     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1166                              ++$Jobstep->{'failures'},
1167                              $temporary_fail ? 'temporary' : 'permanent',
1168                              $elapsed));
1169
1170     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1171       # Give up on this task, and the whole job
1172       $main::success = 0;
1173     }
1174     # Put this task back on the todo queue
1175     push @jobstep_todo, $jobstepid;
1176     $Job->{'tasks_summary'}->{'failed'}++;
1177   }
1178   else
1179   {
1180     ++$thisround_succeeded;
1181     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1182     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1183     push @jobstep_done, $jobstepid;
1184     Log ($jobstepid, "success in $elapsed seconds");
1185   }
1186   $Jobstep->{exitcode} = $childstatus;
1187   $Jobstep->{finishtime} = time;
1188   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1189   $Jobstep->{'arvados_task'}->save;
1190   process_stderr ($jobstepid, $task_success);
1191   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1192                            length($Jobstep->{'arvados_task'}->{output}),
1193                            $Jobstep->{'arvados_task'}->{output}));
1194
1195   close $reader{$jobstepid};
1196   delete $reader{$jobstepid};
1197   delete $slot[$proc{$pid}->{slot}]->{pid};
1198   push @freeslot, $proc{$pid}->{slot};
1199   delete $proc{$pid};
1200
1201   if ($task_success) {
1202     # Load new tasks
1203     my $newtask_list = [];
1204     my $newtask_results;
1205     do {
1206       $newtask_results = api_call(
1207         "job_tasks/list",
1208         'where' => {
1209           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1210         },
1211         'order' => 'qsequence',
1212         'offset' => scalar(@$newtask_list),
1213       );
1214       push(@$newtask_list, @{$newtask_results->{items}});
1215     } while (@{$newtask_results->{items}});
1216     foreach my $arvados_task (@$newtask_list) {
1217       my $jobstep = {
1218         'level' => $arvados_task->{'sequence'},
1219         'failures' => 0,
1220         'arvados_task' => $arvados_task
1221       };
1222       push @jobstep, $jobstep;
1223       push @jobstep_todo, $#jobstep;
1224     }
1225   }
1226
1227   $progress_is_dirty = 1;
1228   1;
1229 }
1230
1231 sub check_refresh_wanted
1232 {
1233   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1234   if (@stat && $stat[9] > $latest_refresh) {
1235     $latest_refresh = scalar time;
1236     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1237     for my $attr ('cancelled_at',
1238                   'cancelled_by_user_uuid',
1239                   'cancelled_by_client_uuid',
1240                   'state') {
1241       $Job->{$attr} = $Job2->{$attr};
1242     }
1243     if ($Job->{'state'} ne "Running") {
1244       if ($Job->{'state'} eq "Cancelled") {
1245         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1246       } else {
1247         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1248       }
1249       $main::success = 0;
1250       $main::please_freeze = 1;
1251     }
1252   }
1253 }
1254
1255 sub check_squeue
1256 {
1257   my $last_squeue_check = $squeue_checked;
1258
1259   # Do not call `squeue` or check the kill list more than once every
1260   # 15 seconds.
1261   return if $last_squeue_check > time - 15;
1262   $squeue_checked = time;
1263
1264   # Look for children from which we haven't received stderr data since
1265   # the last squeue check. If no such children exist, all procs are
1266   # alive and there's no need to even look at squeue.
1267   #
1268   # As long as the crunchstat poll interval (10s) is shorter than the
1269   # squeue check interval (15s) this should make the squeue check an
1270   # infrequent event.
1271   my $silent_procs = 0;
1272   for my $jobstep (values %proc)
1273   {
1274     if ($jobstep->{stderr_at} < $last_squeue_check)
1275     {
1276       $silent_procs++;
1277     }
1278   }
1279   return if $silent_procs == 0;
1280
1281   # use killem() on procs whose killtime is reached
1282   while (my ($pid, $jobstep) = each %proc)
1283   {
1284     if (exists $jobstep->{killtime}
1285         && $jobstep->{killtime} <= time
1286         && $jobstep->{stderr_at} < $last_squeue_check)
1287     {
1288       my $sincewhen = "";
1289       if ($jobstep->{stderr_at}) {
1290         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1291       }
1292       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1293       killem ($pid);
1294     }
1295   }
1296
1297   if (!$have_slurm)
1298   {
1299     # here is an opportunity to check for mysterious problems with local procs
1300     return;
1301   }
1302
1303   # Get a list of steps still running.  Note: squeue(1) says --steps
1304   # selects a format (which we override anyway) and allows us to
1305   # specify which steps we're interested in (which we don't).
1306   # Importantly, it also changes the meaning of %j from "job name" to
1307   # "step name" and (although this isn't mentioned explicitly in the
1308   # docs) switches from "one line per job" mode to "one line per step"
1309   # mode. Without it, we'd just get a list of one job, instead of a
1310   # list of N steps.
1311   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1312   if ($? != 0)
1313   {
1314     Log(undef, "warning: squeue exit status $? ($!)");
1315     return;
1316   }
1317   chop @squeue;
1318
1319   # which of my jobsteps are running, according to squeue?
1320   my %ok;
1321   for my $jobstepname (@squeue)
1322   {
1323     $ok{$jobstepname} = 1;
1324   }
1325
1326   # Check for child procs >60s old and not mentioned by squeue.
1327   while (my ($pid, $jobstep) = each %proc)
1328   {
1329     if ($jobstep->{time} < time - 60
1330         && $jobstep->{jobstepname}
1331         && !exists $ok{$jobstep->{jobstepname}}
1332         && !exists $jobstep->{killtime})
1333     {
1334       # According to slurm, this task has ended (successfully or not)
1335       # -- but our srun child hasn't exited. First we must wait (30
1336       # seconds) in case this is just a race between communication
1337       # channels. Then, if our srun child process still hasn't
1338       # terminated, we'll conclude some slurm communication
1339       # error/delay has caused the task to die without notifying srun,
1340       # and we'll kill srun ourselves.
1341       $jobstep->{killtime} = time + 30;
1342       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1343     }
1344   }
1345 }
1346
1347
1348 sub release_allocation
1349 {
1350   if ($have_slurm)
1351   {
1352     Log (undef, "release job allocation");
1353     system "scancel $ENV{SLURM_JOB_ID}";
1354   }
1355 }
1356
1357
1358 sub readfrompipes
1359 {
1360   my $gotsome = 0;
1361   foreach my $job (keys %reader)
1362   {
1363     my $buf;
1364     while (0 < sysread ($reader{$job}, $buf, 8192))
1365     {
1366       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1367       $jobstep[$job]->{stderr_at} = time;
1368       $jobstep[$job]->{stderr} .= $buf;
1369       preprocess_stderr ($job);
1370       if (length ($jobstep[$job]->{stderr}) > 16384)
1371       {
1372         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1373       }
1374       $gotsome = 1;
1375     }
1376   }
1377   return $gotsome;
1378 }
1379
1380
1381 sub preprocess_stderr
1382 {
1383   my $job = shift;
1384
1385   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1386     my $line = $1;
1387     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1388     Log ($job, "stderr $line");
1389     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1390       # whoa.
1391       $main::please_freeze = 1;
1392     }
1393     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1394       $jobstep[$job]->{node_fail} = 1;
1395       ban_node_by_slot($jobstep[$job]->{slotindex});
1396     }
1397   }
1398 }
1399
1400
1401 sub process_stderr
1402 {
1403   my $job = shift;
1404   my $task_success = shift;
1405   preprocess_stderr ($job);
1406
1407   map {
1408     Log ($job, "stderr $_");
1409   } split ("\n", $jobstep[$job]->{stderr});
1410 }
1411
1412 sub fetch_block
1413 {
1414   my $hash = shift;
1415   my $keep;
1416   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1417     Log(undef, "fetch_block run error from arv-get $hash: $!");
1418     return undef;
1419   }
1420   my $output_block = "";
1421   while (1) {
1422     my $buf;
1423     my $bytes = sysread($keep, $buf, 1024 * 1024);
1424     if (!defined $bytes) {
1425       Log(undef, "fetch_block read error from arv-get: $!");
1426       $output_block = undef;
1427       last;
1428     } elsif ($bytes == 0) {
1429       # sysread returns 0 at the end of the pipe.
1430       last;
1431     } else {
1432       # some bytes were read into buf.
1433       $output_block .= $buf;
1434     }
1435   }
1436   close $keep;
1437   if ($?) {
1438     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1439     $output_block = undef;
1440   }
1441   return $output_block;
1442 }
1443
1444 # Create a collection by concatenating the output of all tasks (each
1445 # task's output is either a manifest fragment, a locator for a
1446 # manifest fragment stored in Keep, or nothing at all). Return the
1447 # portable_data_hash of the new collection.
1448 sub create_output_collection
1449 {
1450   Log (undef, "collate");
1451
1452   my ($child_out, $child_in);
1453   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1454 import arvados
1455 import sys
1456 print (arvados.api("v1").collections().
1457        create(body={"manifest_text": sys.stdin.read()}).
1458        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1459 }, retry_count());
1460
1461   my $task_idx = -1;
1462   my $manifest_size = 0;
1463   for (@jobstep)
1464   {
1465     ++$task_idx;
1466     my $output = $_->{'arvados_task'}->{output};
1467     next if (!defined($output));
1468     my $next_write;
1469     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1470       $next_write = fetch_block($output);
1471     } else {
1472       $next_write = $output;
1473     }
1474     if (defined($next_write)) {
1475       if (!defined(syswrite($child_in, $next_write))) {
1476         # There's been an error writing.  Stop the loop.
1477         # We'll log details about the exit code later.
1478         last;
1479       } else {
1480         $manifest_size += length($next_write);
1481       }
1482     } else {
1483       my $uuid = $_->{'arvados_task'}->{'uuid'};
1484       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1485       $main::success = 0;
1486     }
1487   }
1488   close($child_in);
1489   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1490
1491   my $joboutput;
1492   my $s = IO::Select->new($child_out);
1493   if ($s->can_read(120)) {
1494     sysread($child_out, $joboutput, 1024 * 1024);
1495     waitpid($pid, 0);
1496     if ($?) {
1497       Log(undef, "output collection creation exited " . exit_status_s($?));
1498       $joboutput = undef;
1499     } else {
1500       chomp($joboutput);
1501     }
1502   } else {
1503     Log (undef, "timed out while creating output collection");
1504     foreach my $signal (2, 2, 2, 15, 15, 9) {
1505       kill($signal, $pid);
1506       last if waitpid($pid, WNOHANG) == -1;
1507       sleep(1);
1508     }
1509   }
1510   close($child_out);
1511
1512   return $joboutput;
1513 }
1514
1515
1516 sub killem
1517 {
1518   foreach (@_)
1519   {
1520     my $sig = 2;                # SIGINT first
1521     if (exists $proc{$_}->{"sent_$sig"} &&
1522         time - $proc{$_}->{"sent_$sig"} > 4)
1523     {
1524       $sig = 15;                # SIGTERM if SIGINT doesn't work
1525     }
1526     if (exists $proc{$_}->{"sent_$sig"} &&
1527         time - $proc{$_}->{"sent_$sig"} > 4)
1528     {
1529       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1530     }
1531     if (!exists $proc{$_}->{"sent_$sig"})
1532     {
1533       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1534       kill $sig, $_;
1535       select (undef, undef, undef, 0.1);
1536       if ($sig == 2)
1537       {
1538         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1539       }
1540       $proc{$_}->{"sent_$sig"} = time;
1541       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1542     }
1543   }
1544 }
1545
1546
1547 sub fhbits
1548 {
1549   my($bits);
1550   for (@_) {
1551     vec($bits,fileno($_),1) = 1;
1552   }
1553   $bits;
1554 }
1555
1556
1557 # Send log output to Keep via arv-put.
1558 #
1559 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1560 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1561 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1562 # $log_pipe_pid is the pid of the arv-put subprocess.
1563 #
1564 # The only functions that should access these variables directly are:
1565 #
1566 # log_writer_start($logfilename)
1567 #     Starts an arv-put pipe, reading data on stdin and writing it to
1568 #     a $logfilename file in an output collection.
1569 #
1570 # log_writer_read_output([$timeout])
1571 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1572 #     Passes $timeout to the select() call, with a default of 0.01.
1573 #     Returns the result of the last read() call on $log_pipe_out, or
1574 #     -1 if read() wasn't called because select() timed out.
1575 #     Only other log_writer_* functions should need to call this.
1576 #
1577 # log_writer_send($txt)
1578 #     Writes $txt to the output log collection.
1579 #
1580 # log_writer_finish()
1581 #     Closes the arv-put pipe and returns the output that it produces.
1582 #
1583 # log_writer_is_active()
1584 #     Returns a true value if there is currently a live arv-put
1585 #     process, false otherwise.
1586 #
1587 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1588     $log_pipe_pid);
1589
1590 sub log_writer_start($)
1591 {
1592   my $logfilename = shift;
1593   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1594                         'arv-put',
1595                         '--stream',
1596                         '--retries', '3',
1597                         '--filename', $logfilename,
1598                         '-');
1599   $log_pipe_out_buf = "";
1600   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1601 }
1602
1603 sub log_writer_read_output {
1604   my $timeout = shift || 0.01;
1605   my $read = -1;
1606   while ($read && $log_pipe_out_select->can_read($timeout)) {
1607     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1608                  length($log_pipe_out_buf));
1609   }
1610   if (!defined($read)) {
1611     Log(undef, "error reading log manifest from arv-put: $!");
1612   }
1613   return $read;
1614 }
1615
1616 sub log_writer_send($)
1617 {
1618   my $txt = shift;
1619   print $log_pipe_in $txt;
1620   log_writer_read_output();
1621 }
1622
1623 sub log_writer_finish()
1624 {
1625   return unless $log_pipe_pid;
1626
1627   close($log_pipe_in);
1628
1629   my $read_result = log_writer_read_output(120);
1630   if ($read_result == -1) {
1631     Log (undef, "timed out reading from 'arv-put'");
1632   } elsif ($read_result != 0) {
1633     Log(undef, "failed to read arv-put log manifest to EOF");
1634   }
1635
1636   waitpid($log_pipe_pid, 0);
1637   if ($?) {
1638     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1639   }
1640
1641   close($log_pipe_out);
1642   my $arv_put_output = $log_pipe_out_buf;
1643   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1644       $log_pipe_out_select = undef;
1645
1646   return $arv_put_output;
1647 }
1648
1649 sub log_writer_is_active() {
1650   return $log_pipe_pid;
1651 }
1652
1653 sub Log                         # ($jobstep_id, $logmessage)
1654 {
1655   if ($_[1] =~ /\n/) {
1656     for my $line (split (/\n/, $_[1])) {
1657       Log ($_[0], $line);
1658     }
1659     return;
1660   }
1661   my $fh = select STDERR; $|=1; select $fh;
1662   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1663   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1664   $message .= "\n";
1665   my $datetime;
1666   if (log_writer_is_active() || -t STDERR) {
1667     my @gmtime = gmtime;
1668     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1669                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1670   }
1671   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1672
1673   if (log_writer_is_active()) {
1674     log_writer_send($datetime . " " . $message);
1675   }
1676 }
1677
1678
1679 sub croak
1680 {
1681   my ($package, $file, $line) = caller;
1682   my $message = "@_ at $file line $line\n";
1683   Log (undef, $message);
1684   freeze() if @jobstep_todo;
1685   create_output_collection() if @jobstep_todo;
1686   cleanup();
1687   save_meta();
1688   die;
1689 }
1690
1691
1692 sub cleanup
1693 {
1694   return unless $Job;
1695   if ($Job->{'state'} eq 'Cancelled') {
1696     $Job->update_attributes('finished_at' => scalar gmtime);
1697   } else {
1698     $Job->update_attributes('state' => 'Failed');
1699   }
1700 }
1701
1702
1703 sub save_meta
1704 {
1705   my $justcheckpoint = shift; # false if this will be the last meta saved
1706   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1707   return unless log_writer_is_active();
1708
1709   my $log_manifest = "";
1710   if ($Job->{log}) {
1711     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1712     $log_manifest .= $prev_log_coll->{manifest_text};
1713   }
1714   $log_manifest .= log_writer_finish();
1715
1716   my $log_coll = api_call(
1717     "collections/create", ensure_unique_name => 1, collection => {
1718       manifest_text => $log_manifest,
1719       owner_uuid => $Job->{owner_uuid},
1720       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1721     });
1722   Log(undef, "log manifest is " . $log_coll->{portable_data_hash});
1723   $Job->{'log'} = $log_coll->{portable_data_hash};
1724   $Job->update_attributes('log', $log_coll->{portable_data_hash});
1725 }
1726
1727
1728 sub freeze_if_want_freeze
1729 {
1730   if ($main::please_freeze)
1731   {
1732     release_allocation();
1733     if (@_)
1734     {
1735       # kill some srun procs before freeze+stop
1736       map { $proc{$_} = {} } @_;
1737       while (%proc)
1738       {
1739         killem (keys %proc);
1740         select (undef, undef, undef, 0.1);
1741         my $died;
1742         while (($died = waitpid (-1, WNOHANG)) > 0)
1743         {
1744           delete $proc{$died};
1745         }
1746       }
1747     }
1748     freeze();
1749     create_output_collection();
1750     cleanup();
1751     save_meta();
1752     exit 1;
1753   }
1754 }
1755
1756
1757 sub freeze
1758 {
1759   Log (undef, "Freeze not implemented");
1760   return;
1761 }
1762
1763
1764 sub thaw
1765 {
1766   croak ("Thaw not implemented");
1767 }
1768
1769
1770 sub freezequote
1771 {
1772   my $s = shift;
1773   $s =~ s/\\/\\\\/g;
1774   $s =~ s/\n/\\n/g;
1775   return $s;
1776 }
1777
1778
1779 sub freezeunquote
1780 {
1781   my $s = shift;
1782   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1783   return $s;
1784 }
1785
1786
1787 sub srun
1788 {
1789   my $srunargs = shift;
1790   my $execargs = shift;
1791   my $opts = shift || {};
1792   my $stdin = shift;
1793   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1794
1795   $Data::Dumper::Terse = 1;
1796   $Data::Dumper::Indent = 0;
1797   my $show_cmd = Dumper($args);
1798   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1799   $show_cmd =~ s/\n/ /g;
1800   if ($opts->{fork}) {
1801     Log(undef, "starting: $show_cmd");
1802   } else {
1803     # This is a child process: parent is in charge of reading our
1804     # stderr and copying it to Log() if needed.
1805     warn "starting: $show_cmd\n";
1806   }
1807
1808   if (defined $stdin) {
1809     my $child = open STDIN, "-|";
1810     defined $child or die "no fork: $!";
1811     if ($child == 0) {
1812       print $stdin or die $!;
1813       close STDOUT or die $!;
1814       exit 0;
1815     }
1816   }
1817
1818   return system (@$args) if $opts->{fork};
1819
1820   exec @$args;
1821   warn "ENV size is ".length(join(" ",%ENV));
1822   die "exec failed: $!: @$args";
1823 }
1824
1825
1826 sub ban_node_by_slot {
1827   # Don't start any new jobsteps on this node for 60 seconds
1828   my $slotid = shift;
1829   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1830   $slot[$slotid]->{node}->{hold_count}++;
1831   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1832 }
1833
1834 sub must_lock_now
1835 {
1836   my ($lockfile, $error_message) = @_;
1837   open L, ">", $lockfile or croak("$lockfile: $!");
1838   if (!flock L, LOCK_EX|LOCK_NB) {
1839     croak("Can't lock $lockfile: $error_message\n");
1840   }
1841 }
1842
1843 sub find_docker_image {
1844   # Given a Keep locator, check to see if it contains a Docker image.
1845   # If so, return its stream name and Docker hash.
1846   # If not, return undef for both values.
1847   my $locator = shift;
1848   my ($streamname, $filename);
1849   my $image = api_call("collections/get", uuid => $locator);
1850   if ($image) {
1851     foreach my $line (split(/\n/, $image->{manifest_text})) {
1852       my @tokens = split(/\s+/, $line);
1853       next if (!@tokens);
1854       $streamname = shift(@tokens);
1855       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1856         if (defined($filename)) {
1857           return (undef, undef);  # More than one file in the Collection.
1858         } else {
1859           $filename = (split(/:/, $filedata, 3))[2];
1860         }
1861       }
1862     }
1863   }
1864   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1865     return ($streamname, $1);
1866   } else {
1867     return (undef, undef);
1868   }
1869 }
1870
1871 sub retry_count {
1872   # Calculate the number of times an operation should be retried,
1873   # assuming exponential backoff, and that we're willing to retry as
1874   # long as tasks have been running.  Enforce a minimum of 3 retries.
1875   my ($starttime, $endtime, $timediff, $retries);
1876   if (@jobstep) {
1877     $starttime = $jobstep[0]->{starttime};
1878     $endtime = $jobstep[-1]->{finishtime};
1879   }
1880   if (!defined($starttime)) {
1881     $timediff = 0;
1882   } elsif (!defined($endtime)) {
1883     $timediff = time - $starttime;
1884   } else {
1885     $timediff = ($endtime - $starttime) - (time - $endtime);
1886   }
1887   if ($timediff > 0) {
1888     $retries = int(log($timediff) / log(2));
1889   } else {
1890     $retries = 1;  # Use the minimum.
1891   }
1892   return ($retries > 3) ? $retries : 3;
1893 }
1894
1895 sub retry_op {
1896   # Pass in two function references.
1897   # This method will be called with the remaining arguments.
1898   # If it dies, retry it with exponential backoff until it succeeds,
1899   # or until the current retry_count is exhausted.  After each failure
1900   # that can be retried, the second function will be called with
1901   # the current try count (0-based), next try time, and error message.
1902   my $operation = shift;
1903   my $retry_callback = shift;
1904   my $retries = retry_count();
1905   foreach my $try_count (0..$retries) {
1906     my $next_try = time + (2 ** $try_count);
1907     my $result = eval { $operation->(@_); };
1908     if (!$@) {
1909       return $result;
1910     } elsif ($try_count < $retries) {
1911       $retry_callback->($try_count, $next_try, $@);
1912       my $sleep_time = $next_try - time;
1913       sleep($sleep_time) if ($sleep_time > 0);
1914     }
1915   }
1916   # Ensure the error message ends in a newline, so Perl doesn't add
1917   # retry_op's line number to it.
1918   chomp($@);
1919   die($@ . "\n");
1920 }
1921
1922 sub api_call {
1923   # Pass in a /-separated API method name, and arguments for it.
1924   # This function will call that method, retrying as needed until
1925   # the current retry_count is exhausted, with a log on the first failure.
1926   my $method_name = shift;
1927   my $log_api_retry = sub {
1928     my ($try_count, $next_try_at, $errmsg) = @_;
1929     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1930     $errmsg =~ s/\s/ /g;
1931     $errmsg =~ s/\s+$//;
1932     my $retry_msg;
1933     if ($next_try_at < time) {
1934       $retry_msg = "Retrying.";
1935     } else {
1936       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1937       $retry_msg = "Retrying at $next_try_fmt.";
1938     }
1939     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1940   };
1941   my $method = $arv;
1942   foreach my $key (split(/\//, $method_name)) {
1943     $method = $method->{$key};
1944   }
1945   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1946 }
1947
1948 sub exit_status_s {
1949   # Given a $?, return a human-readable exit code string like "0" or
1950   # "1" or "0 with signal 1" or "1 with signal 11".
1951   my $exitcode = shift;
1952   my $s = $exitcode >> 8;
1953   if ($exitcode & 0x7f) {
1954     $s .= " with signal " . ($exitcode & 0x7f);
1955   }
1956   if ($exitcode & 0x80) {
1957     $s .= " with core dump";
1958   }
1959   return $s;
1960 }
1961
1962 sub handle_readall {
1963   # Pass in a glob reference to a file handle.
1964   # Read all its contents and return them as a string.
1965   my $fh_glob_ref = shift;
1966   local $/ = undef;
1967   return <$fh_glob_ref>;
1968 }
1969
1970 sub tar_filename_n {
1971   my $n = shift;
1972   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1973 }
1974
1975 sub add_git_archive {
1976   # Pass in a git archive command as a string or list, a la system().
1977   # This method will save its output to be included in the archive sent to the
1978   # build script.
1979   my $git_input;
1980   $git_tar_count++;
1981   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1982     croak("Failed to save git archive: $!");
1983   }
1984   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1985   close($git_input);
1986   waitpid($git_pid, 0);
1987   close(GIT_ARCHIVE);
1988   if ($?) {
1989     croak("Failed to save git archive: git exited " . exit_status_s($?));
1990   }
1991 }
1992
1993 sub combined_git_archive {
1994   # Combine all saved tar archives into a single archive, then return its
1995   # contents in a string.  Return undef if no archives have been saved.
1996   if ($git_tar_count < 1) {
1997     return undef;
1998   }
1999   my $base_tar_name = tar_filename_n(1);
2000   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2001     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2002     if ($tar_exit != 0) {
2003       croak("Error preparing build archive: tar -A exited " .
2004             exit_status_s($tar_exit));
2005     }
2006   }
2007   if (!open(GIT_TAR, "<", $base_tar_name)) {
2008     croak("Could not open build archive: $!");
2009   }
2010   my $tar_contents = handle_readall(\*GIT_TAR);
2011   close(GIT_TAR);
2012   return $tar_contents;
2013 }
2014
2015 sub set_nonblocking {
2016   my $fh = shift;
2017   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2018   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2019 }
2020
2021 __DATA__
2022 #!/usr/bin/perl
2023 #
2024 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2025 # server invokes this script on individual compute nodes, or localhost if we're
2026 # running a job locally.  It gets called in two modes:
2027 #
2028 # * No arguments: Installation mode.  Read a tar archive from the DATA
2029 #   file handle; it includes the Crunch script's source code, and
2030 #   maybe SDKs as well.  Those should be installed in the proper
2031 #   locations.  This runs outside of any Docker container, so don't try to
2032 #   introspect Crunch's runtime environment.
2033 #
2034 # * With arguments: Crunch script run mode.  This script should set up the
2035 #   environment, then run the command specified in the arguments.  This runs
2036 #   inside any Docker container.
2037
2038 use Fcntl ':flock';
2039 use File::Path qw( make_path remove_tree );
2040 use POSIX qw(getcwd);
2041
2042 use constant TASK_TEMPFAIL => 111;
2043
2044 # Map SDK subdirectories to the path environments they belong to.
2045 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2046
2047 my $destdir = $ENV{"CRUNCH_SRC"};
2048 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2049 my $repo = $ENV{"CRUNCH_SRC_URL"};
2050 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2051 my $job_work = $ENV{"JOB_WORK"};
2052 my $task_work = $ENV{"TASK_WORK"};
2053
2054 open(STDOUT_ORIG, ">&", STDOUT);
2055 open(STDERR_ORIG, ">&", STDERR);
2056
2057 for my $dir ($destdir, $job_work, $task_work) {
2058   if ($dir) {
2059     make_path $dir;
2060     -e $dir or die "Failed to create temporary directory ($dir): $!";
2061   }
2062 }
2063
2064 if ($task_work) {
2065   remove_tree($task_work, {keep_root => 1});
2066 }
2067
2068 ### Crunch script run mode
2069 if (@ARGV) {
2070   # We want to do routine logging during task 0 only.  This gives the user
2071   # the information they need, but avoids repeating the information for every
2072   # task.
2073   my $Log;
2074   if ($ENV{TASK_SEQUENCE} eq "0") {
2075     $Log = sub {
2076       my $msg = shift;
2077       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2078     };
2079   } else {
2080     $Log = sub { };
2081   }
2082
2083   my $python_src = "$install_dir/python";
2084   my $venv_dir = "$job_work/.arvados.venv";
2085   my $venv_built = -e "$venv_dir/bin/activate";
2086   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2087     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2088                  "--python=python2.7", $venv_dir);
2089     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2090     $venv_built = 1;
2091     $Log->("Built Python SDK virtualenv");
2092   }
2093
2094   my $pip_bin = "pip";
2095   if ($venv_built) {
2096     $Log->("Running in Python SDK virtualenv");
2097     $pip_bin = "$venv_dir/bin/pip";
2098     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2099     @ARGV = ("/bin/sh", "-ec",
2100              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2101   } elsif (-d $python_src) {
2102     $Log->("Warning: virtualenv not found inside Docker container default " .
2103            "\$PATH. Can't install Python SDK.");
2104   }
2105
2106   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2107   if ($pkgs) {
2108     $Log->("Using Arvados SDK:");
2109     foreach my $line (split /\n/, $pkgs) {
2110       $Log->($line);
2111     }
2112   } else {
2113     $Log->("Arvados SDK packages not found");
2114   }
2115
2116   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2117     my $sdk_path = "$install_dir/$sdk_dir";
2118     if (-d $sdk_path) {
2119       if ($ENV{$sdk_envkey}) {
2120         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2121       } else {
2122         $ENV{$sdk_envkey} = $sdk_path;
2123       }
2124       $Log->("Arvados SDK added to %s", $sdk_envkey);
2125     }
2126   }
2127
2128   exec(@ARGV);
2129   die "Cannot exec `@ARGV`: $!";
2130 }
2131
2132 ### Installation mode
2133 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2134 flock L, LOCK_EX;
2135 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2136   # This exact git archive (source + arvados sdk) is already installed
2137   # here, so there's no need to reinstall it.
2138
2139   # We must consume our DATA section, though: otherwise the process
2140   # feeding it to us will get SIGPIPE.
2141   my $buf;
2142   while (read(DATA, $buf, 65536)) { }
2143
2144   exit(0);
2145 }
2146
2147 unlink "$destdir.archive_hash";
2148 mkdir $destdir;
2149
2150 do {
2151   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2152   local $SIG{PIPE} = "IGNORE";
2153   warn "Extracting archive: $archive_hash\n";
2154   # --ignore-zeros is necessary sometimes: depending on how much NUL
2155   # padding tar -A put on our combined archive (which in turn depends
2156   # on the length of the component archives) tar without
2157   # --ignore-zeros will exit before consuming stdin and cause close()
2158   # to fail on the resulting SIGPIPE.
2159   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2160     die "Error launching 'tar -xC $destdir': $!";
2161   }
2162   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2163   # get SIGPIPE.  We must feed it data incrementally.
2164   my $tar_input;
2165   while (read(DATA, $tar_input, 65536)) {
2166     print TARX $tar_input;
2167   }
2168   if(!close(TARX)) {
2169     die "'tar -xC $destdir' exited $?: $!";
2170   }
2171 };
2172
2173 mkdir $install_dir;
2174
2175 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2176 if (-d $sdk_root) {
2177   foreach my $sdk_lang (("python",
2178                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2179     if (-d "$sdk_root/$sdk_lang") {
2180       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2181         die "Failed to install $sdk_lang SDK: $!";
2182       }
2183     }
2184   }
2185 }
2186
2187 my $python_dir = "$install_dir/python";
2188 if ((-d $python_dir) and can_run("python2.7")) {
2189   open(my $egg_info_pipe, "-|",
2190        "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
2191   my @egg_info_errors = <$egg_info_pipe>;
2192   close($egg_info_pipe);
2193   if ($?) {
2194     if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
2195       # egg_info apparently failed because it couldn't ask git for a build tag.
2196       # Specify no build tag.
2197       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2198       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2199       close($pysdk_cfg);
2200     } else {
2201       my $egg_info_exit = $? >> 8;
2202       foreach my $errline (@egg_info_errors) {
2203         print STDERR_ORIG $errline;
2204       }
2205       warn "python setup.py egg_info failed: exit $egg_info_exit";
2206       exit ($egg_info_exit || 1);
2207     }
2208   }
2209 }
2210
2211 # Hide messages from the install script (unless it fails: shell_or_die
2212 # will show $destdir.log in that case).
2213 open(STDOUT, ">>", "$destdir.log");
2214 open(STDERR, ">&", STDOUT);
2215
2216 if (-e "$destdir/crunch_scripts/install") {
2217     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2218 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2219     # Old version
2220     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2221 } elsif (-e "./install.sh") {
2222     shell_or_die (undef, "./install.sh", $install_dir);
2223 }
2224
2225 if ($archive_hash) {
2226     unlink "$destdir.archive_hash.new";
2227     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2228     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2229 }
2230
2231 close L;
2232
2233 sub can_run {
2234   my $command_name = shift;
2235   open(my $which, "-|", "which", $command_name);
2236   while (<$which>) { }
2237   close($which);
2238   return ($? == 0);
2239 }
2240
2241 sub shell_or_die
2242 {
2243   my $exitcode = shift;
2244
2245   if ($ENV{"DEBUG"}) {
2246     print STDERR "@_\n";
2247   }
2248   if (system (@_) != 0) {
2249     my $err = $!;
2250     my $code = $?;
2251     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2252     open STDERR, ">&STDERR_ORIG";
2253     system ("cat $destdir.log >&2");
2254     warn "@_ failed ($err): $exitstatus";
2255     if (defined($exitcode)) {
2256       exit $exitcode;
2257     }
2258     else {
2259       exit (($code >> 8) || 1);
2260     }
2261   }
2262 }
2263
2264 __DATA__