6592: Do not overload $Job temporarily.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "/usr/bin/docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $local_job = JSON::decode_json($jobspec);
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if (($Job || $local_job)->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $local_job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $local_job->{'started_at'} = gmtime;
216   $local_job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $local_job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  # The number of consecutive times a task has been dispatched
297                  # to this node and failed.
298                  losing_streak => 0,
299                  # The number of consecutive times that SLURM has reported
300                  # a node failure since the last successful task.
301                  fail_count => 0,
302                  # Don't dispatch work to this node until this time
303                  # (in seconds since the epoch) has passed.
304                  hold_until => 0 };
305     foreach my $cpu (1..$ncpus)
306     {
307       push @slot, { node => $node,
308                     cpu => $cpu };
309     }
310   }
311   push @node, @nodelist;
312 }
313
314
315
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
318
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
320
321
322 $Job->update_attributes(
323   'tasks_summary' => { 'failed' => 0,
324                        'todo' => 1,
325                        'running' => 0,
326                        'done' => 0 });
327
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
336
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
342
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
346
347
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
354
355
356
357 if (defined $Job->{thawedfromkey})
358 {
359   thaw ($Job->{thawedfromkey});
360 }
361 else
362 {
363   my $first_task = api_call("job_tasks/create", job_task => {
364     'job_uuid' => $Job->{'uuid'},
365     'sequence' => 0,
366     'qsequence' => 0,
367     'parameters' => {},
368   });
369   push @jobstep, { 'level' => 0,
370                    'failures' => 0,
371                    'arvados_task' => $first_task,
372                  };
373   push @jobstep_todo, 0;
374 }
375
376
377 if (!$have_slurm)
378 {
379   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
380 }
381
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
385
386 if (!defined $no_clear_tmp) {
387   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388   Log (undef, "Clean work dirs");
389
390   my $cleanpid = fork();
391   if ($cleanpid == 0)
392   {
393     # Find FUSE mounts that look like Keep mounts (the mount path has the
394     # word "keep") and unmount them.  Then clean up work directories.
395     # TODO: When #5036 is done and widely deployed, we can get rid of the
396     # regular expression and just unmount everything with type fuse.keep.
397     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
399     exit (1);
400   }
401   while (1)
402   {
403     last if $cleanpid == waitpid (-1, WNOHANG);
404     freeze_if_want_freeze ($cleanpid);
405     select (undef, undef, undef, 0.1);
406   }
407   Log (undef, "Cleanup command exited ".exit_status_s($?));
408 }
409
410 # If this job requires a Docker image, install that.
411 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
412 if ($docker_locator = $Job->{docker_image_locator}) {
413   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
414   if (!$docker_hash)
415   {
416     croak("No Docker image hash found from locator $docker_locator");
417   }
418   $docker_stream =~ s/^\.//;
419   my $docker_install_script = qq{
420 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
421     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
422 fi
423 };
424   my $docker_pid = fork();
425   if ($docker_pid == 0)
426   {
427     srun (["srun", "--nodelist=" . join(',', @node)],
428           ["/bin/sh", "-ec", $docker_install_script]);
429     exit ($?);
430   }
431   while (1)
432   {
433     last if $docker_pid == waitpid (-1, WNOHANG);
434     freeze_if_want_freeze ($docker_pid);
435     select (undef, undef, undef, 0.1);
436   }
437   if ($? != 0)
438   {
439     croak("Installing Docker image from $docker_locator exited "
440           .exit_status_s($?));
441   }
442
443   # Determine whether this version of Docker supports memory+swap limits.
444   srun(["srun", "--nodelist=" . $node[0]],
445        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
446       {fork => 1});
447   $docker_limitmem = ($? == 0);
448
449   if ($Job->{arvados_sdk_version}) {
450     # The job also specifies an Arvados SDK version.  Add the SDKs to the
451     # tar file for the build script to install.
452     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
453                        $Job->{arvados_sdk_version}));
454     add_git_archive("git", "--git-dir=$git_dir", "archive",
455                     "--prefix=.arvados.sdk/",
456                     $Job->{arvados_sdk_version}, "sdk");
457   }
458 }
459
460 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
461   # If script_version looks like an absolute path, *and* the --git-dir
462   # argument was not given -- which implies we were not invoked by
463   # crunch-dispatch -- we will use the given path as a working
464   # directory instead of resolving script_version to a git commit (or
465   # doing anything else with git).
466   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
467   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
468 }
469 else {
470   # Resolve the given script_version to a git commit sha1. Also, if
471   # the repository is remote, clone it into our local filesystem: this
472   # ensures "git archive" will work, and is necessary to reliably
473   # resolve a symbolic script_version like "master^".
474   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
475
476   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
477
478   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
479
480   # If we're running under crunch-dispatch, it will have already
481   # pulled the appropriate source tree into its own repository, and
482   # given us that repo's path as $git_dir.
483   #
484   # If we're running a "local" job, we might have to fetch content
485   # from a remote repository.
486   #
487   # (Currently crunch-dispatch gives a local path with --git-dir, but
488   # we might as well accept URLs there too in case it changes its
489   # mind.)
490   my $repo = $git_dir || $Job->{'repository'};
491
492   # Repository can be remote or local. If remote, we'll need to fetch it
493   # to a local dir before doing `git log` et al.
494   my $repo_location;
495
496   if ($repo =~ m{://|^[^/]*:}) {
497     # $repo is a git url we can clone, like git:// or https:// or
498     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
499     # not recognized here because distinguishing that from a local
500     # path is too fragile. If you really need something strange here,
501     # use the ssh:// form.
502     $repo_location = 'remote';
503   } elsif ($repo =~ m{^\.*/}) {
504     # $repo is a local path to a git index. We'll also resolve ../foo
505     # to ../foo/.git if the latter is a directory. To help
506     # disambiguate local paths from named hosted repositories, this
507     # form must be given as ./ or ../ if it's a relative path.
508     if (-d "$repo/.git") {
509       $repo = "$repo/.git";
510     }
511     $repo_location = 'local';
512   } else {
513     # $repo is none of the above. It must be the name of a hosted
514     # repository.
515     my $arv_repo_list = api_call("repositories/list",
516                                  'filters' => [['name','=',$repo]]);
517     my @repos_found = @{$arv_repo_list->{'items'}};
518     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
519     if ($n_found > 0) {
520       Log(undef, "Repository '$repo' -> "
521           . join(", ", map { $_->{'uuid'} } @repos_found));
522     }
523     if ($n_found != 1) {
524       croak("Error: Found $n_found repositories with name '$repo'.");
525     }
526     $repo = $repos_found[0]->{'fetch_url'};
527     $repo_location = 'remote';
528   }
529   Log(undef, "Using $repo_location repository '$repo'");
530   $ENV{"CRUNCH_SRC_URL"} = $repo;
531
532   # Resolve given script_version (we'll call that $treeish here) to a
533   # commit sha1 ($commit).
534   my $treeish = $Job->{'script_version'};
535   my $commit;
536   if ($repo_location eq 'remote') {
537     # We minimize excess object-fetching by re-using the same bare
538     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
539     # just keep adding remotes to it as needed.
540     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
541     my $gitcmd = "git --git-dir=\Q$local_repo\E";
542
543     # Set up our local repo for caching remote objects, making
544     # archives, etc.
545     if (!-d $local_repo) {
546       make_path($local_repo) or croak("Error: could not create $local_repo");
547     }
548     # This works (exits 0 and doesn't delete fetched objects) even
549     # if $local_repo is already initialized:
550     `$gitcmd init --bare`;
551     if ($?) {
552       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
553     }
554
555     # If $treeish looks like a hash (or abbrev hash) we look it up in
556     # our local cache first, since that's cheaper. (We don't want to
557     # do that with tags/branches though -- those change over time, so
558     # they should always be resolved by the remote repo.)
559     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
560       # Hide stderr because it's normal for this to fail:
561       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
562       if ($? == 0 &&
563           # Careful not to resolve a branch named abcdeff to commit 1234567:
564           $sha1 =~ /^$treeish/ &&
565           $sha1 =~ /^([0-9a-f]{40})$/s) {
566         $commit = $1;
567         Log(undef, "Commit $commit already present in $local_repo");
568       }
569     }
570
571     if (!defined $commit) {
572       # If $treeish isn't just a hash or abbrev hash, or isn't here
573       # yet, we need to fetch the remote to resolve it correctly.
574
575       # First, remove all local heads. This prevents a name that does
576       # not exist on the remote from resolving to (or colliding with)
577       # a previously fetched branch or tag (possibly from a different
578       # remote).
579       remove_tree("$local_repo/refs/heads", {keep_root => 1});
580
581       Log(undef, "Fetching objects from $repo to $local_repo");
582       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
583       if ($?) {
584         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
585       }
586     }
587
588     # Now that the data is all here, we will use our local repo for
589     # the rest of our git activities.
590     $repo = $local_repo;
591   }
592
593   my $gitcmd = "git --git-dir=\Q$repo\E";
594   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
595   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
596     croak("`$gitcmd rev-list` exited "
597           .exit_status_s($?)
598           .", '$treeish' not found. Giving up.");
599   }
600   $commit = $1;
601   Log(undef, "Version $treeish is commit $commit");
602
603   if ($commit ne $Job->{'script_version'}) {
604     # Record the real commit id in the database, frozentokey, logs,
605     # etc. -- instead of an abbreviation or a branch name which can
606     # become ambiguous or point to a different commit in the future.
607     if (!$Job->update_attributes('script_version' => $commit)) {
608       croak("Error: failed to update job's script_version attribute");
609     }
610   }
611
612   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
613   add_git_archive("$gitcmd archive ''\Q$commit\E");
614 }
615
616 my $git_archive = combined_git_archive();
617 if (!defined $git_archive) {
618   Log(undef, "Skip install phase (no git archive)");
619   if ($have_slurm) {
620     Log(undef, "Warning: This probably means workers have no source tree!");
621   }
622 }
623 else {
624   my $install_exited;
625   my $install_script_tries_left = 3;
626   for (my $attempts = 0; $attempts < 3; $attempts++) {
627     Log(undef, "Run install script on all workers");
628
629     my @srunargs = ("srun",
630                     "--nodelist=$nodelist",
631                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
632     my @execargs = ("sh", "-c",
633                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
634
635     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
636     my ($install_stderr_r, $install_stderr_w);
637     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
638     set_nonblocking($install_stderr_r);
639     my $installpid = fork();
640     if ($installpid == 0)
641     {
642       close($install_stderr_r);
643       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
644       open(STDOUT, ">&", $install_stderr_w);
645       open(STDERR, ">&", $install_stderr_w);
646       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
647       exit (1);
648     }
649     close($install_stderr_w);
650     # Tell freeze_if_want_freeze how to kill the child, otherwise the
651     # "waitpid(installpid)" loop won't get interrupted by a freeze:
652     $proc{$installpid} = {};
653     my $stderr_buf = '';
654     # Track whether anything appears on stderr other than slurm errors
655     # ("srun: ...") and the "starting: ..." message printed by the
656     # srun subroutine itself:
657     my $stderr_anything_from_script = 0;
658     my $match_our_own_errors = '^(srun: error: |starting: \[)';
659     while ($installpid != waitpid(-1, WNOHANG)) {
660       freeze_if_want_freeze ($installpid);
661       # Wait up to 0.1 seconds for something to appear on stderr, then
662       # do a non-blocking read.
663       my $bits = fhbits($install_stderr_r);
664       select ($bits, undef, $bits, 0.1);
665       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
666       {
667         while ($stderr_buf =~ /^(.*?)\n/) {
668           my $line = $1;
669           substr $stderr_buf, 0, 1+length($line), "";
670           Log(undef, "stderr $line");
671           if ($line !~ /$match_our_own_errors/) {
672             $stderr_anything_from_script = 1;
673           }
674         }
675       }
676     }
677     delete $proc{$installpid};
678     $install_exited = $?;
679     close($install_stderr_r);
680     if (length($stderr_buf) > 0) {
681       if ($stderr_buf !~ /$match_our_own_errors/) {
682         $stderr_anything_from_script = 1;
683       }
684       Log(undef, "stderr $stderr_buf")
685     }
686
687     Log (undef, "Install script exited ".exit_status_s($install_exited));
688     last if $install_exited == 0 || $main::please_freeze;
689     # If the install script fails but doesn't print an error message,
690     # the next thing anyone is likely to do is just run it again in
691     # case it was a transient problem like "slurm communication fails
692     # because the network isn't reliable enough". So we'll just do
693     # that ourselves (up to 3 attempts in total). OTOH, if there is an
694     # error message, the problem is more likely to have a real fix and
695     # we should fail the job so the fixing process can start, instead
696     # of doing 2 more attempts.
697     last if $stderr_anything_from_script;
698   }
699
700   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
701     unlink($tar_filename);
702   }
703
704   if ($install_exited != 0) {
705     croak("Giving up");
706   }
707 }
708
709 foreach (qw (script script_version script_parameters runtime_constraints))
710 {
711   Log (undef,
712        "$_ " .
713        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
714 }
715 foreach (split (/\n/, $Job->{knobs}))
716 {
717   Log (undef, "knob " . $_);
718 }
719
720
721
722 $main::success = undef;
723
724
725
726 ONELEVEL:
727
728 my $thisround_succeeded = 0;
729 my $thisround_failed = 0;
730 my $thisround_failed_multiple = 0;
731 my $working_slot_count = scalar(@slot);
732
733 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
734                        or $a <=> $b } @jobstep_todo;
735 my $level = $jobstep[$jobstep_todo[0]]->{level};
736
737 my $initial_tasks_this_level = 0;
738 foreach my $id (@jobstep_todo) {
739   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
740 }
741
742 # If the number of tasks scheduled at this level #T is smaller than the number
743 # of slots available #S, only use the first #T slots, or the first slot on
744 # each node, whichever number is greater.
745 #
746 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
747 # based on these numbers.  Using fewer slots makes more resources available
748 # to each individual task, which should normally be a better strategy when
749 # there are fewer of them running with less parallelism.
750 #
751 # Note that this calculation is not redone if the initial tasks at
752 # this level queue more tasks at the same level.  This may harm
753 # overall task throughput for that level.
754 my @freeslot;
755 if ($initial_tasks_this_level < @node) {
756   @freeslot = (0..$#node);
757 } elsif ($initial_tasks_this_level < @slot) {
758   @freeslot = (0..$initial_tasks_this_level - 1);
759 } else {
760   @freeslot = (0..$#slot);
761 }
762 my $round_num_freeslots = scalar(@freeslot);
763
764 my %round_max_slots = ();
765 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
766   my $this_slot = $slot[$freeslot[$ii]];
767   my $node_name = $this_slot->{node}->{name};
768   $round_max_slots{$node_name} ||= $this_slot->{cpu};
769   last if (scalar(keys(%round_max_slots)) >= @node);
770 }
771
772 Log(undef, "start level $level with $round_num_freeslots slots");
773 my @holdslot;
774 my %reader;
775 my $progress_is_dirty = 1;
776 my $progress_stats_updated = 0;
777
778 update_progress_stats();
779
780
781 THISROUND:
782 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
783 {
784   my $id = $jobstep_todo[$todo_ptr];
785   my $Jobstep = $jobstep[$id];
786   if ($Jobstep->{level} != $level)
787   {
788     next;
789   }
790
791   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
792   set_nonblocking($reader{$id});
793
794   my $childslot = $freeslot[0];
795   my $childnode = $slot[$childslot]->{node};
796   my $childslotname = join (".",
797                             $slot[$childslot]->{node}->{name},
798                             $slot[$childslot]->{cpu});
799
800   my $childpid = fork();
801   if ($childpid == 0)
802   {
803     $SIG{'INT'} = 'DEFAULT';
804     $SIG{'QUIT'} = 'DEFAULT';
805     $SIG{'TERM'} = 'DEFAULT';
806
807     foreach (values (%reader))
808     {
809       close($_);
810     }
811     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
812     open(STDOUT,">&writer");
813     open(STDERR,">&writer");
814
815     undef $dbh;
816     undef $sth;
817
818     delete $ENV{"GNUPGHOME"};
819     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
820     $ENV{"TASK_QSEQUENCE"} = $id;
821     $ENV{"TASK_SEQUENCE"} = $level;
822     $ENV{"JOB_SCRIPT"} = $Job->{script};
823     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
824       $param =~ tr/a-z/A-Z/;
825       $ENV{"JOB_PARAMETER_$param"} = $value;
826     }
827     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
828     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
829     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
830     $ENV{"HOME"} = $ENV{"TASK_WORK"};
831     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
832     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
833     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
834     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
835
836     $ENV{"GZIP"} = "-n";
837
838     my @srunargs = (
839       "srun",
840       "--nodelist=".$childnode->{name},
841       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
842       "--job-name=$job_id.$id.$$",
843         );
844     my $command =
845         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
846         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
847         ."&& cd $ENV{CRUNCH_TMP} "
848         # These environment variables get used explicitly later in
849         # $command.  No tool is expected to read these values directly.
850         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
851         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
852         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
853         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
854     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
855     if ($docker_hash)
856     {
857       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
858       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
859       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
860       # We only set memory limits if Docker lets us limit both memory and swap.
861       # Memory limits alone have been supported longer, but subprocesses tend
862       # to get SIGKILL if they exceed that without any swap limit set.
863       # See #5642 for additional background.
864       if ($docker_limitmem) {
865         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
866       }
867
868       # Dynamically configure the container to use the host system as its
869       # DNS server.  Get the host's global addresses from the ip command,
870       # and turn them into docker --dns options using gawk.
871       $command .=
872           q{$(ip -o address show scope global |
873               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
874
875       # The source tree and $destdir directory (which we have
876       # installed on the worker host) are available in the container,
877       # under the same path.
878       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
879       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
880
881       # Currently, we make arv-mount's mount point appear at /keep
882       # inside the container (instead of using the same path as the
883       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
884       # crunch scripts and utilities must not rely on this. They must
885       # use $TASK_KEEPMOUNT.
886       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
887       $ENV{TASK_KEEPMOUNT} = "/keep";
888
889       # TASK_WORK is almost exactly like a docker data volume: it
890       # starts out empty, is writable, and persists until no
891       # containers use it any more. We don't use --volumes-from to
892       # share it with other containers: it is only accessible to this
893       # task, and it goes away when this task stops.
894       #
895       # However, a docker data volume is writable only by root unless
896       # the mount point already happens to exist in the container with
897       # different permissions. Therefore, we [1] assume /tmp already
898       # exists in the image and is writable by the crunch user; [2]
899       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
900       # writable if they are created by docker while setting up the
901       # other --volumes); and [3] create $TASK_WORK inside the
902       # container using $build_script.
903       $command .= "--volume=/tmp ";
904       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
905       $ENV{"HOME"} = $ENV{"TASK_WORK"};
906       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
907
908       # TODO: Share a single JOB_WORK volume across all task
909       # containers on a given worker node, and delete it when the job
910       # ends (and, in case that doesn't work, when the next job
911       # starts).
912       #
913       # For now, use the same approach as TASK_WORK above.
914       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
915
916       while (my ($env_key, $env_val) = each %ENV)
917       {
918         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
919           $command .= "--env=\Q$env_key=$env_val\E ";
920         }
921       }
922       $command .= "--env=\QHOME=$ENV{HOME}\E ";
923       $command .= "\Q$docker_hash\E ";
924       $command .= "stdbuf --output=0 --error=0 ";
925       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
926     } else {
927       # Non-docker run
928       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
929       $command .= "stdbuf --output=0 --error=0 ";
930       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
931     }
932
933     my @execargs = ('bash', '-c', $command);
934     srun (\@srunargs, \@execargs, undef, $build_script);
935     # exec() failed, we assume nothing happened.
936     die "srun() failed on build script\n";
937   }
938   close("writer");
939   if (!defined $childpid)
940   {
941     close $reader{$id};
942     delete $reader{$id};
943     next;
944   }
945   shift @freeslot;
946   $proc{$childpid} = { jobstep => $id,
947                        time => time,
948                        slot => $childslot,
949                        jobstepname => "$job_id.$id.$childpid",
950                      };
951   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
952   $slot[$childslot]->{pid} = $childpid;
953
954   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
955   Log ($id, "child $childpid started on $childslotname");
956   $Jobstep->{starttime} = time;
957   $Jobstep->{node} = $childnode->{name};
958   $Jobstep->{slotindex} = $childslot;
959   delete $Jobstep->{stderr};
960   delete $Jobstep->{finishtime};
961   delete $Jobstep->{tempfail};
962
963   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
964   $Jobstep->{'arvados_task'}->save;
965
966   splice @jobstep_todo, $todo_ptr, 1;
967   --$todo_ptr;
968
969   $progress_is_dirty = 1;
970
971   while (!@freeslot
972          ||
973          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
974   {
975     last THISROUND if $main::please_freeze || defined($main::success);
976     if ($main::please_info)
977     {
978       $main::please_info = 0;
979       freeze();
980       create_output_collection();
981       save_meta(1);
982       update_progress_stats();
983     }
984     my $gotsome
985         = readfrompipes ()
986         + reapchildren ();
987     if (!$gotsome)
988     {
989       check_refresh_wanted();
990       check_squeue();
991       update_progress_stats();
992       select (undef, undef, undef, 0.1);
993     }
994     elsif (time - $progress_stats_updated >= 30)
995     {
996       update_progress_stats();
997     }
998     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
999                                         $_->{node}->{hold_count} < 4 } @slot);
1000     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1001         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1002     {
1003       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1004           .($thisround_failed+$thisround_succeeded)
1005           .") -- giving up on this round";
1006       Log (undef, $message);
1007       last THISROUND;
1008     }
1009
1010     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1011     for (my $i=$#freeslot; $i>=0; $i--) {
1012       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1013         push @holdslot, (splice @freeslot, $i, 1);
1014       }
1015     }
1016     for (my $i=$#holdslot; $i>=0; $i--) {
1017       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1018         push @freeslot, (splice @holdslot, $i, 1);
1019       }
1020     }
1021
1022     # give up if no nodes are succeeding
1023     if ($working_slot_count < 1) {
1024       Log(undef, "Every node has failed -- giving up");
1025       last THISROUND;
1026     }
1027   }
1028 }
1029
1030
1031 push @freeslot, splice @holdslot;
1032 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1033
1034
1035 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1036 while (%proc)
1037 {
1038   if ($main::please_continue) {
1039     $main::please_continue = 0;
1040     goto THISROUND;
1041   }
1042   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1043   readfrompipes ();
1044   if (!reapchildren())
1045   {
1046     check_refresh_wanted();
1047     check_squeue();
1048     update_progress_stats();
1049     select (undef, undef, undef, 0.1);
1050     killem (keys %proc) if $main::please_freeze;
1051   }
1052 }
1053
1054 update_progress_stats();
1055 freeze_if_want_freeze();
1056
1057
1058 if (!defined $main::success)
1059 {
1060   if (!@jobstep_todo) {
1061     $main::success = 1;
1062   } elsif ($working_slot_count < 1) {
1063     save_output_collection();
1064     save_meta();
1065     exit(EX_RETRY_UNLOCKED);
1066   } elsif ($thisround_succeeded == 0 &&
1067            ($thisround_failed == 0 || $thisround_failed > 4)) {
1068     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1069     Log (undef, $message);
1070     $main::success = 0;
1071   }
1072 }
1073
1074 goto ONELEVEL if !defined $main::success;
1075
1076
1077 release_allocation();
1078 freeze();
1079 my $collated_output = save_output_collection();
1080 Log (undef, "finish");
1081
1082 save_meta();
1083
1084 my $final_state;
1085 if ($collated_output && $main::success) {
1086   $final_state = 'Complete';
1087 } else {
1088   $final_state = 'Failed';
1089 }
1090 $Job->update_attributes('state' => $final_state);
1091
1092 exit (($final_state eq 'Complete') ? 0 : 1);
1093
1094
1095
1096 sub update_progress_stats
1097 {
1098   $progress_stats_updated = time;
1099   return if !$progress_is_dirty;
1100   my ($todo, $done, $running) = (scalar @jobstep_todo,
1101                                  scalar @jobstep_done,
1102                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1103   $Job->{'tasks_summary'} ||= {};
1104   $Job->{'tasks_summary'}->{'todo'} = $todo;
1105   $Job->{'tasks_summary'}->{'done'} = $done;
1106   $Job->{'tasks_summary'}->{'running'} = $running;
1107   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1108   Log (undef, "status: $done done, $running running, $todo todo");
1109   $progress_is_dirty = 0;
1110 }
1111
1112
1113
1114 sub reapchildren
1115 {
1116   my $pid = waitpid (-1, WNOHANG);
1117   return 0 if $pid <= 0;
1118
1119   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1120                   . "."
1121                   . $slot[$proc{$pid}->{slot}]->{cpu});
1122   my $jobstepid = $proc{$pid}->{jobstep};
1123   my $elapsed = time - $proc{$pid}->{time};
1124   my $Jobstep = $jobstep[$jobstepid];
1125
1126   my $childstatus = $?;
1127   my $exitvalue = $childstatus >> 8;
1128   my $exitinfo = "exit ".exit_status_s($childstatus);
1129   $Jobstep->{'arvados_task'}->reload;
1130   my $task_success = $Jobstep->{'arvados_task'}->{success};
1131
1132   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1133
1134   if (!defined $task_success) {
1135     # task did not indicate one way or the other --> fail
1136     $Jobstep->{'arvados_task'}->{success} = 0;
1137     $Jobstep->{'arvados_task'}->save;
1138     $task_success = 0;
1139   }
1140
1141   if (!$task_success)
1142   {
1143     my $temporary_fail;
1144     $temporary_fail ||= $Jobstep->{tempfail};
1145     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1146
1147     ++$thisround_failed;
1148     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1149
1150     # Check for signs of a failed or misconfigured node
1151     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1152         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1153       # Don't count this against jobstep failure thresholds if this
1154       # node is already suspected faulty and srun exited quickly
1155       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1156           $elapsed < 5) {
1157         Log ($jobstepid, "blaming failure on suspect node " .
1158              $slot[$proc{$pid}->{slot}]->{node}->{name});
1159         $temporary_fail ||= 1;
1160       }
1161       ban_node_by_slot($proc{$pid}->{slot});
1162     }
1163
1164     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1165                              ++$Jobstep->{'failures'},
1166                              $temporary_fail ? 'temporary' : 'permanent',
1167                              $elapsed));
1168
1169     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1170       # Give up on this task, and the whole job
1171       $main::success = 0;
1172     }
1173     # Put this task back on the todo queue
1174     push @jobstep_todo, $jobstepid;
1175     $Job->{'tasks_summary'}->{'failed'}++;
1176   }
1177   else
1178   {
1179     ++$thisround_succeeded;
1180     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1181     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1182     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1183     push @jobstep_done, $jobstepid;
1184     Log ($jobstepid, "success in $elapsed seconds");
1185   }
1186   $Jobstep->{exitcode} = $childstatus;
1187   $Jobstep->{finishtime} = time;
1188   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1189   $Jobstep->{'arvados_task'}->save;
1190   process_stderr ($jobstepid, $task_success);
1191   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1192                            length($Jobstep->{'arvados_task'}->{output}),
1193                            $Jobstep->{'arvados_task'}->{output}));
1194
1195   close $reader{$jobstepid};
1196   delete $reader{$jobstepid};
1197   delete $slot[$proc{$pid}->{slot}]->{pid};
1198   push @freeslot, $proc{$pid}->{slot};
1199   delete $proc{$pid};
1200
1201   if ($task_success) {
1202     # Load new tasks
1203     my $newtask_list = [];
1204     my $newtask_results;
1205     do {
1206       $newtask_results = api_call(
1207         "job_tasks/list",
1208         'where' => {
1209           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1210         },
1211         'order' => 'qsequence',
1212         'offset' => scalar(@$newtask_list),
1213       );
1214       push(@$newtask_list, @{$newtask_results->{items}});
1215     } while (@{$newtask_results->{items}});
1216     foreach my $arvados_task (@$newtask_list) {
1217       my $jobstep = {
1218         'level' => $arvados_task->{'sequence'},
1219         'failures' => 0,
1220         'arvados_task' => $arvados_task
1221       };
1222       push @jobstep, $jobstep;
1223       push @jobstep_todo, $#jobstep;
1224     }
1225   }
1226
1227   $progress_is_dirty = 1;
1228   1;
1229 }
1230
1231 sub check_refresh_wanted
1232 {
1233   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1234   if (@stat && $stat[9] > $latest_refresh) {
1235     $latest_refresh = scalar time;
1236     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1237     for my $attr ('cancelled_at',
1238                   'cancelled_by_user_uuid',
1239                   'cancelled_by_client_uuid',
1240                   'state') {
1241       $Job->{$attr} = $Job2->{$attr};
1242     }
1243     if ($Job->{'state'} ne "Running") {
1244       if ($Job->{'state'} eq "Cancelled") {
1245         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1246       } else {
1247         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1248       }
1249       $main::success = 0;
1250       $main::please_freeze = 1;
1251     }
1252   }
1253 }
1254
1255 sub check_squeue
1256 {
1257   my $last_squeue_check = $squeue_checked;
1258
1259   # Do not call `squeue` or check the kill list more than once every
1260   # 15 seconds.
1261   return if $last_squeue_check > time - 15;
1262   $squeue_checked = time;
1263
1264   # Look for children from which we haven't received stderr data since
1265   # the last squeue check. If no such children exist, all procs are
1266   # alive and there's no need to even look at squeue.
1267   #
1268   # As long as the crunchstat poll interval (10s) is shorter than the
1269   # squeue check interval (15s) this should make the squeue check an
1270   # infrequent event.
1271   my $silent_procs = 0;
1272   for my $jobstep (values %proc)
1273   {
1274     if ($jobstep->{stderr_at} < $last_squeue_check)
1275     {
1276       $silent_procs++;
1277     }
1278   }
1279   return if $silent_procs == 0;
1280
1281   # use killem() on procs whose killtime is reached
1282   while (my ($pid, $jobstep) = each %proc)
1283   {
1284     if (exists $jobstep->{killtime}
1285         && $jobstep->{killtime} <= time
1286         && $jobstep->{stderr_at} < $last_squeue_check)
1287     {
1288       my $sincewhen = "";
1289       if ($jobstep->{stderr_at}) {
1290         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1291       }
1292       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1293       killem ($pid);
1294     }
1295   }
1296
1297   if (!$have_slurm)
1298   {
1299     # here is an opportunity to check for mysterious problems with local procs
1300     return;
1301   }
1302
1303   # Get a list of steps still running.  Note: squeue(1) says --steps
1304   # selects a format (which we override anyway) and allows us to
1305   # specify which steps we're interested in (which we don't).
1306   # Importantly, it also changes the meaning of %j from "job name" to
1307   # "step name" and (although this isn't mentioned explicitly in the
1308   # docs) switches from "one line per job" mode to "one line per step"
1309   # mode. Without it, we'd just get a list of one job, instead of a
1310   # list of N steps.
1311   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1312   if ($? != 0)
1313   {
1314     Log(undef, "warning: squeue exit status $? ($!)");
1315     return;
1316   }
1317   chop @squeue;
1318
1319   # which of my jobsteps are running, according to squeue?
1320   my %ok;
1321   for my $jobstepname (@squeue)
1322   {
1323     $ok{$jobstepname} = 1;
1324   }
1325
1326   # Check for child procs >60s old and not mentioned by squeue.
1327   while (my ($pid, $jobstep) = each %proc)
1328   {
1329     if ($jobstep->{time} < time - 60
1330         && $jobstep->{jobstepname}
1331         && !exists $ok{$jobstep->{jobstepname}}
1332         && !exists $jobstep->{killtime})
1333     {
1334       # According to slurm, this task has ended (successfully or not)
1335       # -- but our srun child hasn't exited. First we must wait (30
1336       # seconds) in case this is just a race between communication
1337       # channels. Then, if our srun child process still hasn't
1338       # terminated, we'll conclude some slurm communication
1339       # error/delay has caused the task to die without notifying srun,
1340       # and we'll kill srun ourselves.
1341       $jobstep->{killtime} = time + 30;
1342       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1343     }
1344   }
1345 }
1346
1347
1348 sub release_allocation
1349 {
1350   if ($have_slurm)
1351   {
1352     Log (undef, "release job allocation");
1353     system "scancel $ENV{SLURM_JOB_ID}";
1354   }
1355 }
1356
1357
1358 sub readfrompipes
1359 {
1360   my $gotsome = 0;
1361   foreach my $job (keys %reader)
1362   {
1363     my $buf;
1364     while (0 < sysread ($reader{$job}, $buf, 8192))
1365     {
1366       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1367       $jobstep[$job]->{stderr_at} = time;
1368       $jobstep[$job]->{stderr} .= $buf;
1369       preprocess_stderr ($job);
1370       if (length ($jobstep[$job]->{stderr}) > 16384)
1371       {
1372         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1373       }
1374       $gotsome = 1;
1375     }
1376   }
1377   return $gotsome;
1378 }
1379
1380
1381 sub preprocess_stderr
1382 {
1383   my $job = shift;
1384
1385   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1386     my $line = $1;
1387     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1388     Log ($job, "stderr $line");
1389     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1390       # whoa.
1391       $main::please_freeze = 1;
1392     }
1393     elsif ($line =~ /srun: error: Node failure on/) {
1394       my $job_slot_index = $jobstep[$job]->{slotindex};
1395       $slot[$job_slot_index]->{node}->{fail_count}++;
1396       $jobstep[$job]->{tempfail} = 1;
1397       ban_node_by_slot($job_slot_index);
1398     }
1399     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1400       $jobstep[$job]->{tempfail} = 1;
1401       ban_node_by_slot($jobstep[$job]->{slotindex});
1402     }
1403     elsif ($line =~ /arvados\.errors\.Keep/) {
1404       $jobstep[$job]->{tempfail} = 1;
1405     }
1406   }
1407 }
1408
1409
1410 sub process_stderr
1411 {
1412   my $job = shift;
1413   my $task_success = shift;
1414   preprocess_stderr ($job);
1415
1416   map {
1417     Log ($job, "stderr $_");
1418   } split ("\n", $jobstep[$job]->{stderr});
1419 }
1420
1421 sub fetch_block
1422 {
1423   my $hash = shift;
1424   my $keep;
1425   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1426     Log(undef, "fetch_block run error from arv-get $hash: $!");
1427     return undef;
1428   }
1429   my $output_block = "";
1430   while (1) {
1431     my $buf;
1432     my $bytes = sysread($keep, $buf, 1024 * 1024);
1433     if (!defined $bytes) {
1434       Log(undef, "fetch_block read error from arv-get: $!");
1435       $output_block = undef;
1436       last;
1437     } elsif ($bytes == 0) {
1438       # sysread returns 0 at the end of the pipe.
1439       last;
1440     } else {
1441       # some bytes were read into buf.
1442       $output_block .= $buf;
1443     }
1444   }
1445   close $keep;
1446   if ($?) {
1447     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1448     $output_block = undef;
1449   }
1450   return $output_block;
1451 }
1452
1453 # Create a collection by concatenating the output of all tasks (each
1454 # task's output is either a manifest fragment, a locator for a
1455 # manifest fragment stored in Keep, or nothing at all). Return the
1456 # portable_data_hash of the new collection.
1457 sub create_output_collection
1458 {
1459   Log (undef, "collate");
1460
1461   my ($child_out, $child_in);
1462   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1463 import arvados
1464 import sys
1465 print (arvados.api("v1").collections().
1466        create(body={"manifest_text": sys.stdin.read()}).
1467        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1468 }, retry_count());
1469
1470   my $task_idx = -1;
1471   my $manifest_size = 0;
1472   for (@jobstep)
1473   {
1474     ++$task_idx;
1475     my $output = $_->{'arvados_task'}->{output};
1476     next if (!defined($output));
1477     my $next_write;
1478     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1479       $next_write = fetch_block($output);
1480     } else {
1481       $next_write = $output;
1482     }
1483     if (defined($next_write)) {
1484       if (!defined(syswrite($child_in, $next_write))) {
1485         # There's been an error writing.  Stop the loop.
1486         # We'll log details about the exit code later.
1487         last;
1488       } else {
1489         $manifest_size += length($next_write);
1490       }
1491     } else {
1492       my $uuid = $_->{'arvados_task'}->{'uuid'};
1493       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1494       $main::success = 0;
1495     }
1496   }
1497   close($child_in);
1498   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1499
1500   my $joboutput;
1501   my $s = IO::Select->new($child_out);
1502   if ($s->can_read(120)) {
1503     sysread($child_out, $joboutput, 1024 * 1024);
1504     waitpid($pid, 0);
1505     if ($?) {
1506       Log(undef, "output collection creation exited " . exit_status_s($?));
1507       $joboutput = undef;
1508     } else {
1509       chomp($joboutput);
1510     }
1511   } else {
1512     Log (undef, "timed out while creating output collection");
1513     foreach my $signal (2, 2, 2, 15, 15, 9) {
1514       kill($signal, $pid);
1515       last if waitpid($pid, WNOHANG) == -1;
1516       sleep(1);
1517     }
1518   }
1519   close($child_out);
1520
1521   return $joboutput;
1522 }
1523
1524 # Calls create_output_collection, logs the result, and returns it.
1525 # If that was successful, save that as the output in the job record.
1526 sub save_output_collection {
1527   my $collated_output = create_output_collection();
1528
1529   if (!$collated_output) {
1530     Log(undef, "Failed to write output collection");
1531   }
1532   else {
1533     Log(undef, "job output $collated_output");
1534     $Job->update_attributes('output' => $collated_output);
1535   }
1536   return $collated_output;
1537 }
1538
1539 sub killem
1540 {
1541   foreach (@_)
1542   {
1543     my $sig = 2;                # SIGINT first
1544     if (exists $proc{$_}->{"sent_$sig"} &&
1545         time - $proc{$_}->{"sent_$sig"} > 4)
1546     {
1547       $sig = 15;                # SIGTERM if SIGINT doesn't work
1548     }
1549     if (exists $proc{$_}->{"sent_$sig"} &&
1550         time - $proc{$_}->{"sent_$sig"} > 4)
1551     {
1552       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1553     }
1554     if (!exists $proc{$_}->{"sent_$sig"})
1555     {
1556       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1557       kill $sig, $_;
1558       select (undef, undef, undef, 0.1);
1559       if ($sig == 2)
1560       {
1561         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1562       }
1563       $proc{$_}->{"sent_$sig"} = time;
1564       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1565     }
1566   }
1567 }
1568
1569
1570 sub fhbits
1571 {
1572   my($bits);
1573   for (@_) {
1574     vec($bits,fileno($_),1) = 1;
1575   }
1576   $bits;
1577 }
1578
1579
1580 # Send log output to Keep via arv-put.
1581 #
1582 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1583 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1584 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1585 # $log_pipe_pid is the pid of the arv-put subprocess.
1586 #
1587 # The only functions that should access these variables directly are:
1588 #
1589 # log_writer_start($logfilename)
1590 #     Starts an arv-put pipe, reading data on stdin and writing it to
1591 #     a $logfilename file in an output collection.
1592 #
1593 # log_writer_read_output([$timeout])
1594 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1595 #     Passes $timeout to the select() call, with a default of 0.01.
1596 #     Returns the result of the last read() call on $log_pipe_out, or
1597 #     -1 if read() wasn't called because select() timed out.
1598 #     Only other log_writer_* functions should need to call this.
1599 #
1600 # log_writer_send($txt)
1601 #     Writes $txt to the output log collection.
1602 #
1603 # log_writer_finish()
1604 #     Closes the arv-put pipe and returns the output that it produces.
1605 #
1606 # log_writer_is_active()
1607 #     Returns a true value if there is currently a live arv-put
1608 #     process, false otherwise.
1609 #
1610 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1611     $log_pipe_pid);
1612
1613 sub log_writer_start($)
1614 {
1615   my $logfilename = shift;
1616   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1617                         'arv-put',
1618                         '--stream',
1619                         '--retries', '3',
1620                         '--filename', $logfilename,
1621                         '-');
1622   $log_pipe_out_buf = "";
1623   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1624 }
1625
1626 sub log_writer_read_output {
1627   my $timeout = shift || 0.01;
1628   my $read = -1;
1629   while ($read && $log_pipe_out_select->can_read($timeout)) {
1630     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1631                  length($log_pipe_out_buf));
1632   }
1633   if (!defined($read)) {
1634     Log(undef, "error reading log manifest from arv-put: $!");
1635   }
1636   return $read;
1637 }
1638
1639 sub log_writer_send($)
1640 {
1641   my $txt = shift;
1642   print $log_pipe_in $txt;
1643   log_writer_read_output();
1644 }
1645
1646 sub log_writer_finish()
1647 {
1648   return unless $log_pipe_pid;
1649
1650   close($log_pipe_in);
1651
1652   my $read_result = log_writer_read_output(120);
1653   if ($read_result == -1) {
1654     Log (undef, "timed out reading from 'arv-put'");
1655   } elsif ($read_result != 0) {
1656     Log(undef, "failed to read arv-put log manifest to EOF");
1657   }
1658
1659   waitpid($log_pipe_pid, 0);
1660   if ($?) {
1661     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1662   }
1663
1664   close($log_pipe_out);
1665   my $arv_put_output = $log_pipe_out_buf;
1666   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1667       $log_pipe_out_select = undef;
1668
1669   return $arv_put_output;
1670 }
1671
1672 sub log_writer_is_active() {
1673   return $log_pipe_pid;
1674 }
1675
1676 sub Log                         # ($jobstep_id, $logmessage)
1677 {
1678   if ($_[1] =~ /\n/) {
1679     for my $line (split (/\n/, $_[1])) {
1680       Log ($_[0], $line);
1681     }
1682     return;
1683   }
1684   my $fh = select STDERR; $|=1; select $fh;
1685   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1686   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1687   $message .= "\n";
1688   my $datetime;
1689   if (log_writer_is_active() || -t STDERR) {
1690     my @gmtime = gmtime;
1691     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1692                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1693   }
1694   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1695
1696   if (log_writer_is_active()) {
1697     log_writer_send($datetime . " " . $message);
1698   }
1699 }
1700
1701
1702 sub croak
1703 {
1704   my ($package, $file, $line) = caller;
1705   my $message = "@_ at $file line $line\n";
1706   Log (undef, $message);
1707   freeze() if @jobstep_todo;
1708   create_output_collection() if @jobstep_todo;
1709   cleanup();
1710   save_meta();
1711   die;
1712 }
1713
1714
1715 sub cleanup
1716 {
1717   return unless $Job;
1718   if ($Job->{'state'} eq 'Cancelled') {
1719     $Job->update_attributes('finished_at' => scalar gmtime);
1720   } else {
1721     $Job->update_attributes('state' => 'Failed');
1722   }
1723 }
1724
1725
1726 sub save_meta
1727 {
1728   my $justcheckpoint = shift; # false if this will be the last meta saved
1729   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1730   return unless log_writer_is_active();
1731
1732   my $log_manifest = "";
1733   if ($Job->{log}) {
1734     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1735     $log_manifest .= $prev_log_coll->{manifest_text};
1736   }
1737   $log_manifest .= log_writer_finish();
1738
1739   my $log_coll = api_call(
1740     "collections/create", ensure_unique_name => 1, collection => {
1741       manifest_text => $log_manifest,
1742       owner_uuid => $Job->{owner_uuid},
1743       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1744     });
1745   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1746   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1747 }
1748
1749
1750 sub freeze_if_want_freeze
1751 {
1752   if ($main::please_freeze)
1753   {
1754     release_allocation();
1755     if (@_)
1756     {
1757       # kill some srun procs before freeze+stop
1758       map { $proc{$_} = {} } @_;
1759       while (%proc)
1760       {
1761         killem (keys %proc);
1762         select (undef, undef, undef, 0.1);
1763         my $died;
1764         while (($died = waitpid (-1, WNOHANG)) > 0)
1765         {
1766           delete $proc{$died};
1767         }
1768       }
1769     }
1770     freeze();
1771     create_output_collection();
1772     cleanup();
1773     save_meta();
1774     exit 1;
1775   }
1776 }
1777
1778
1779 sub freeze
1780 {
1781   Log (undef, "Freeze not implemented");
1782   return;
1783 }
1784
1785
1786 sub thaw
1787 {
1788   croak ("Thaw not implemented");
1789 }
1790
1791
1792 sub freezequote
1793 {
1794   my $s = shift;
1795   $s =~ s/\\/\\\\/g;
1796   $s =~ s/\n/\\n/g;
1797   return $s;
1798 }
1799
1800
1801 sub freezeunquote
1802 {
1803   my $s = shift;
1804   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1805   return $s;
1806 }
1807
1808
1809 sub srun
1810 {
1811   my $srunargs = shift;
1812   my $execargs = shift;
1813   my $opts = shift || {};
1814   my $stdin = shift;
1815   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1816
1817   $Data::Dumper::Terse = 1;
1818   $Data::Dumper::Indent = 0;
1819   my $show_cmd = Dumper($args);
1820   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1821   $show_cmd =~ s/\n/ /g;
1822   if ($opts->{fork}) {
1823     Log(undef, "starting: $show_cmd");
1824   } else {
1825     # This is a child process: parent is in charge of reading our
1826     # stderr and copying it to Log() if needed.
1827     warn "starting: $show_cmd\n";
1828   }
1829
1830   if (defined $stdin) {
1831     my $child = open STDIN, "-|";
1832     defined $child or die "no fork: $!";
1833     if ($child == 0) {
1834       print $stdin or die $!;
1835       close STDOUT or die $!;
1836       exit 0;
1837     }
1838   }
1839
1840   return system (@$args) if $opts->{fork};
1841
1842   exec @$args;
1843   warn "ENV size is ".length(join(" ",%ENV));
1844   die "exec failed: $!: @$args";
1845 }
1846
1847
1848 sub ban_node_by_slot {
1849   # Don't start any new jobsteps on this node for 60 seconds
1850   my $slotid = shift;
1851   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1852   $slot[$slotid]->{node}->{hold_count}++;
1853   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1854 }
1855
1856 sub must_lock_now
1857 {
1858   my ($lockfile, $error_message) = @_;
1859   open L, ">", $lockfile or croak("$lockfile: $!");
1860   if (!flock L, LOCK_EX|LOCK_NB) {
1861     croak("Can't lock $lockfile: $error_message\n");
1862   }
1863 }
1864
1865 sub find_docker_image {
1866   # Given a Keep locator, check to see if it contains a Docker image.
1867   # If so, return its stream name and Docker hash.
1868   # If not, return undef for both values.
1869   my $locator = shift;
1870   my ($streamname, $filename);
1871   my $image = api_call("collections/get", uuid => $locator);
1872   if ($image) {
1873     foreach my $line (split(/\n/, $image->{manifest_text})) {
1874       my @tokens = split(/\s+/, $line);
1875       next if (!@tokens);
1876       $streamname = shift(@tokens);
1877       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1878         if (defined($filename)) {
1879           return (undef, undef);  # More than one file in the Collection.
1880         } else {
1881           $filename = (split(/:/, $filedata, 3))[2];
1882         }
1883       }
1884     }
1885   }
1886   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1887     return ($streamname, $1);
1888   } else {
1889     return (undef, undef);
1890   }
1891 }
1892
1893 sub retry_count {
1894   # Calculate the number of times an operation should be retried,
1895   # assuming exponential backoff, and that we're willing to retry as
1896   # long as tasks have been running.  Enforce a minimum of 3 retries.
1897   my ($starttime, $endtime, $timediff, $retries);
1898   if (@jobstep) {
1899     $starttime = $jobstep[0]->{starttime};
1900     $endtime = $jobstep[-1]->{finishtime};
1901   }
1902   if (!defined($starttime)) {
1903     $timediff = 0;
1904   } elsif (!defined($endtime)) {
1905     $timediff = time - $starttime;
1906   } else {
1907     $timediff = ($endtime - $starttime) - (time - $endtime);
1908   }
1909   if ($timediff > 0) {
1910     $retries = int(log($timediff) / log(2));
1911   } else {
1912     $retries = 1;  # Use the minimum.
1913   }
1914   return ($retries > 3) ? $retries : 3;
1915 }
1916
1917 sub retry_op {
1918   # Pass in two function references.
1919   # This method will be called with the remaining arguments.
1920   # If it dies, retry it with exponential backoff until it succeeds,
1921   # or until the current retry_count is exhausted.  After each failure
1922   # that can be retried, the second function will be called with
1923   # the current try count (0-based), next try time, and error message.
1924   my $operation = shift;
1925   my $retry_callback = shift;
1926   my $retries = retry_count();
1927   foreach my $try_count (0..$retries) {
1928     my $next_try = time + (2 ** $try_count);
1929     my $result = eval { $operation->(@_); };
1930     if (!$@) {
1931       return $result;
1932     } elsif ($try_count < $retries) {
1933       $retry_callback->($try_count, $next_try, $@);
1934       my $sleep_time = $next_try - time;
1935       sleep($sleep_time) if ($sleep_time > 0);
1936     }
1937   }
1938   # Ensure the error message ends in a newline, so Perl doesn't add
1939   # retry_op's line number to it.
1940   chomp($@);
1941   die($@ . "\n");
1942 }
1943
1944 sub api_call {
1945   # Pass in a /-separated API method name, and arguments for it.
1946   # This function will call that method, retrying as needed until
1947   # the current retry_count is exhausted, with a log on the first failure.
1948   my $method_name = shift;
1949   my $log_api_retry = sub {
1950     my ($try_count, $next_try_at, $errmsg) = @_;
1951     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1952     $errmsg =~ s/\s/ /g;
1953     $errmsg =~ s/\s+$//;
1954     my $retry_msg;
1955     if ($next_try_at < time) {
1956       $retry_msg = "Retrying.";
1957     } else {
1958       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1959       $retry_msg = "Retrying at $next_try_fmt.";
1960     }
1961     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1962   };
1963   my $method = $arv;
1964   foreach my $key (split(/\//, $method_name)) {
1965     $method = $method->{$key};
1966   }
1967   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1968 }
1969
1970 sub exit_status_s {
1971   # Given a $?, return a human-readable exit code string like "0" or
1972   # "1" or "0 with signal 1" or "1 with signal 11".
1973   my $exitcode = shift;
1974   my $s = $exitcode >> 8;
1975   if ($exitcode & 0x7f) {
1976     $s .= " with signal " . ($exitcode & 0x7f);
1977   }
1978   if ($exitcode & 0x80) {
1979     $s .= " with core dump";
1980   }
1981   return $s;
1982 }
1983
1984 sub handle_readall {
1985   # Pass in a glob reference to a file handle.
1986   # Read all its contents and return them as a string.
1987   my $fh_glob_ref = shift;
1988   local $/ = undef;
1989   return <$fh_glob_ref>;
1990 }
1991
1992 sub tar_filename_n {
1993   my $n = shift;
1994   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1995 }
1996
1997 sub add_git_archive {
1998   # Pass in a git archive command as a string or list, a la system().
1999   # This method will save its output to be included in the archive sent to the
2000   # build script.
2001   my $git_input;
2002   $git_tar_count++;
2003   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2004     croak("Failed to save git archive: $!");
2005   }
2006   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2007   close($git_input);
2008   waitpid($git_pid, 0);
2009   close(GIT_ARCHIVE);
2010   if ($?) {
2011     croak("Failed to save git archive: git exited " . exit_status_s($?));
2012   }
2013 }
2014
2015 sub combined_git_archive {
2016   # Combine all saved tar archives into a single archive, then return its
2017   # contents in a string.  Return undef if no archives have been saved.
2018   if ($git_tar_count < 1) {
2019     return undef;
2020   }
2021   my $base_tar_name = tar_filename_n(1);
2022   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2023     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2024     if ($tar_exit != 0) {
2025       croak("Error preparing build archive: tar -A exited " .
2026             exit_status_s($tar_exit));
2027     }
2028   }
2029   if (!open(GIT_TAR, "<", $base_tar_name)) {
2030     croak("Could not open build archive: $!");
2031   }
2032   my $tar_contents = handle_readall(\*GIT_TAR);
2033   close(GIT_TAR);
2034   return $tar_contents;
2035 }
2036
2037 sub set_nonblocking {
2038   my $fh = shift;
2039   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2040   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2041 }
2042
2043 __DATA__
2044 #!/usr/bin/env perl
2045 #
2046 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2047 # server invokes this script on individual compute nodes, or localhost if we're
2048 # running a job locally.  It gets called in two modes:
2049 #
2050 # * No arguments: Installation mode.  Read a tar archive from the DATA
2051 #   file handle; it includes the Crunch script's source code, and
2052 #   maybe SDKs as well.  Those should be installed in the proper
2053 #   locations.  This runs outside of any Docker container, so don't try to
2054 #   introspect Crunch's runtime environment.
2055 #
2056 # * With arguments: Crunch script run mode.  This script should set up the
2057 #   environment, then run the command specified in the arguments.  This runs
2058 #   inside any Docker container.
2059
2060 use Fcntl ':flock';
2061 use File::Path qw( make_path remove_tree );
2062 use POSIX qw(getcwd);
2063
2064 use constant TASK_TEMPFAIL => 111;
2065
2066 # Map SDK subdirectories to the path environments they belong to.
2067 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2068
2069 my $destdir = $ENV{"CRUNCH_SRC"};
2070 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2071 my $repo = $ENV{"CRUNCH_SRC_URL"};
2072 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2073 my $job_work = $ENV{"JOB_WORK"};
2074 my $task_work = $ENV{"TASK_WORK"};
2075
2076 open(STDOUT_ORIG, ">&", STDOUT);
2077 open(STDERR_ORIG, ">&", STDERR);
2078
2079 for my $dir ($destdir, $job_work, $task_work) {
2080   if ($dir) {
2081     make_path $dir;
2082     -e $dir or die "Failed to create temporary directory ($dir): $!";
2083   }
2084 }
2085
2086 if ($task_work) {
2087   remove_tree($task_work, {keep_root => 1});
2088 }
2089
2090 ### Crunch script run mode
2091 if (@ARGV) {
2092   # We want to do routine logging during task 0 only.  This gives the user
2093   # the information they need, but avoids repeating the information for every
2094   # task.
2095   my $Log;
2096   if ($ENV{TASK_SEQUENCE} eq "0") {
2097     $Log = sub {
2098       my $msg = shift;
2099       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2100     };
2101   } else {
2102     $Log = sub { };
2103   }
2104
2105   my $python_src = "$install_dir/python";
2106   my $venv_dir = "$job_work/.arvados.venv";
2107   my $venv_built = -e "$venv_dir/bin/activate";
2108   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2109     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2110                  "--python=python2.7", $venv_dir);
2111     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2112     $venv_built = 1;
2113     $Log->("Built Python SDK virtualenv");
2114   }
2115
2116   my $pip_bin = "pip";
2117   if ($venv_built) {
2118     $Log->("Running in Python SDK virtualenv");
2119     $pip_bin = "$venv_dir/bin/pip";
2120     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2121     @ARGV = ("/bin/sh", "-ec",
2122              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2123   } elsif (-d $python_src) {
2124     $Log->("Warning: virtualenv not found inside Docker container default " .
2125            "\$PATH. Can't install Python SDK.");
2126   }
2127
2128   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2129   if ($pkgs) {
2130     $Log->("Using Arvados SDK:");
2131     foreach my $line (split /\n/, $pkgs) {
2132       $Log->($line);
2133     }
2134   } else {
2135     $Log->("Arvados SDK packages not found");
2136   }
2137
2138   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2139     my $sdk_path = "$install_dir/$sdk_dir";
2140     if (-d $sdk_path) {
2141       if ($ENV{$sdk_envkey}) {
2142         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2143       } else {
2144         $ENV{$sdk_envkey} = $sdk_path;
2145       }
2146       $Log->("Arvados SDK added to %s", $sdk_envkey);
2147     }
2148   }
2149
2150   exec(@ARGV);
2151   die "Cannot exec `@ARGV`: $!";
2152 }
2153
2154 ### Installation mode
2155 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2156 flock L, LOCK_EX;
2157 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2158   # This exact git archive (source + arvados sdk) is already installed
2159   # here, so there's no need to reinstall it.
2160
2161   # We must consume our DATA section, though: otherwise the process
2162   # feeding it to us will get SIGPIPE.
2163   my $buf;
2164   while (read(DATA, $buf, 65536)) { }
2165
2166   exit(0);
2167 }
2168
2169 unlink "$destdir.archive_hash";
2170 mkdir $destdir;
2171
2172 do {
2173   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2174   local $SIG{PIPE} = "IGNORE";
2175   warn "Extracting archive: $archive_hash\n";
2176   # --ignore-zeros is necessary sometimes: depending on how much NUL
2177   # padding tar -A put on our combined archive (which in turn depends
2178   # on the length of the component archives) tar without
2179   # --ignore-zeros will exit before consuming stdin and cause close()
2180   # to fail on the resulting SIGPIPE.
2181   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2182     die "Error launching 'tar -xC $destdir': $!";
2183   }
2184   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2185   # get SIGPIPE.  We must feed it data incrementally.
2186   my $tar_input;
2187   while (read(DATA, $tar_input, 65536)) {
2188     print TARX $tar_input;
2189   }
2190   if(!close(TARX)) {
2191     die "'tar -xC $destdir' exited $?: $!";
2192   }
2193 };
2194
2195 mkdir $install_dir;
2196
2197 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2198 if (-d $sdk_root) {
2199   foreach my $sdk_lang (("python",
2200                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2201     if (-d "$sdk_root/$sdk_lang") {
2202       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2203         die "Failed to install $sdk_lang SDK: $!";
2204       }
2205     }
2206   }
2207 }
2208
2209 my $python_dir = "$install_dir/python";
2210 if ((-d $python_dir) and can_run("python2.7")) {
2211   open(my $egg_info_pipe, "-|",
2212        "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
2213   my @egg_info_errors = <$egg_info_pipe>;
2214   close($egg_info_pipe);
2215   if ($?) {
2216     if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
2217       # egg_info apparently failed because it couldn't ask git for a build tag.
2218       # Specify no build tag.
2219       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2220       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2221       close($pysdk_cfg);
2222     } else {
2223       my $egg_info_exit = $? >> 8;
2224       foreach my $errline (@egg_info_errors) {
2225         print STDERR_ORIG $errline;
2226       }
2227       warn "python setup.py egg_info failed: exit $egg_info_exit";
2228       exit ($egg_info_exit || 1);
2229     }
2230   }
2231 }
2232
2233 # Hide messages from the install script (unless it fails: shell_or_die
2234 # will show $destdir.log in that case).
2235 open(STDOUT, ">>", "$destdir.log");
2236 open(STDERR, ">&", STDOUT);
2237
2238 if (-e "$destdir/crunch_scripts/install") {
2239     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2240 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2241     # Old version
2242     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2243 } elsif (-e "./install.sh") {
2244     shell_or_die (undef, "./install.sh", $install_dir);
2245 }
2246
2247 if ($archive_hash) {
2248     unlink "$destdir.archive_hash.new";
2249     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2250     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2251 }
2252
2253 close L;
2254
2255 sub can_run {
2256   my $command_name = shift;
2257   open(my $which, "-|", "which", $command_name);
2258   while (<$which>) { }
2259   close($which);
2260   return ($? == 0);
2261 }
2262
2263 sub shell_or_die
2264 {
2265   my $exitcode = shift;
2266
2267   if ($ENV{"DEBUG"}) {
2268     print STDERR "@_\n";
2269   }
2270   if (system (@_) != 0) {
2271     my $err = $!;
2272     my $code = $?;
2273     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2274     open STDERR, ">&STDERR_ORIG";
2275     system ("cat $destdir.log >&2");
2276     warn "@_ failed ($err): $exitstatus";
2277     if (defined($exitcode)) {
2278       exit $exitcode;
2279     }
2280     else {
2281       exit (($code >> 8) || 1);
2282     }
2283   }
2284 }
2285
2286 __DATA__