Merge branch '6847-docker-dns-wip'
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131            'git-dir=s' => \$git_dir,
132            'job=s' => \$jobspec,
133            'job-api-token=s' => \$job_api_token,
134            'no-clear-tmp' => \$no_clear_tmp,
135            'resume-stash=s' => \$resume_stash,
136            'docker-bin=s' => \$docker_bin,
137     );
138
139 if (defined $job_api_token) {
140   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
141 }
142
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
144
145
146 $SIG{'USR1'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 1;
149 };
150 $SIG{'USR2'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 0;
153 };
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $local_job;
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   $local_job = 0;
169 }
170 else
171 {
172   $local_job = JSON::decode_json($jobspec);
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if (($Job || $local_job)->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $local_job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $local_job->{'started_at'} = gmtime;
216   $local_job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $local_job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  # The number of consecutive times a task has been dispatched
297                  # to this node and failed.
298                  losing_streak => 0,
299                  # The number of consecutive times that SLURM has reported
300                  # a node failure since the last successful task.
301                  fail_count => 0,
302                  # Don't dispatch work to this node until this time
303                  # (in seconds since the epoch) has passed.
304                  hold_until => 0 };
305     foreach my $cpu (1..$ncpus)
306     {
307       push @slot, { node => $node,
308                     cpu => $cpu };
309     }
310   }
311   push @node, @nodelist;
312 }
313
314
315
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
318
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
320
321
322 $Job->update_attributes(
323   'tasks_summary' => { 'failed' => 0,
324                        'todo' => 1,
325                        'running' => 0,
326                        'done' => 0 });
327
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
336
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
342
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
346
347
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
354
355
356
357 if (defined $Job->{thawedfromkey})
358 {
359   thaw ($Job->{thawedfromkey});
360 }
361 else
362 {
363   my $first_task = api_call("job_tasks/create", job_task => {
364     'job_uuid' => $Job->{'uuid'},
365     'sequence' => 0,
366     'qsequence' => 0,
367     'parameters' => {},
368   });
369   push @jobstep, { 'level' => 0,
370                    'failures' => 0,
371                    'arvados_task' => $first_task,
372                  };
373   push @jobstep_todo, 0;
374 }
375
376
377 if (!$have_slurm)
378 {
379   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
380 }
381
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
385
386 if (!defined $no_clear_tmp) {
387   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388   Log (undef, "Clean work dirs");
389
390   my $cleanpid = fork();
391   if ($cleanpid == 0)
392   {
393     # Find FUSE mounts that look like Keep mounts (the mount path has the
394     # word "keep") and unmount them.  Then clean up work directories.
395     # TODO: When #5036 is done and widely deployed, we can get rid of the
396     # regular expression and just unmount everything with type fuse.keep.
397     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
399     exit (1);
400   }
401   while (1)
402   {
403     last if $cleanpid == waitpid (-1, WNOHANG);
404     freeze_if_want_freeze ($cleanpid);
405     select (undef, undef, undef, 0.1);
406   }
407   if ($?) {
408     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
409     exit(EX_RETRY_UNLOCKED);
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   $docker_stream =~ s/^\.//;
422   my $docker_install_script = qq{
423 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
424     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
425 fi
426 };
427   my $docker_pid = fork();
428   if ($docker_pid == 0)
429   {
430     srun (["srun", "--nodelist=" . join(',', @node)],
431           ["/bin/sh", "-ec", $docker_install_script]);
432     exit ($?);
433   }
434   while (1)
435   {
436     last if $docker_pid == waitpid (-1, WNOHANG);
437     freeze_if_want_freeze ($docker_pid);
438     select (undef, undef, undef, 0.1);
439   }
440   if ($? != 0)
441   {
442     croak("Installing Docker image from $docker_locator exited "
443           .exit_status_s($?));
444   }
445
446   # Determine whether this version of Docker supports memory+swap limits.
447   srun(["srun", "--nodelist=" . $node[0]],
448        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
449       {fork => 1});
450   $docker_limitmem = ($? == 0);
451
452   if ($Job->{arvados_sdk_version}) {
453     # The job also specifies an Arvados SDK version.  Add the SDKs to the
454     # tar file for the build script to install.
455     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
456                        $Job->{arvados_sdk_version}));
457     add_git_archive("git", "--git-dir=$git_dir", "archive",
458                     "--prefix=.arvados.sdk/",
459                     $Job->{arvados_sdk_version}, "sdk");
460   }
461 }
462
463 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
464   # If script_version looks like an absolute path, *and* the --git-dir
465   # argument was not given -- which implies we were not invoked by
466   # crunch-dispatch -- we will use the given path as a working
467   # directory instead of resolving script_version to a git commit (or
468   # doing anything else with git).
469   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
470   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
471 }
472 else {
473   # Resolve the given script_version to a git commit sha1. Also, if
474   # the repository is remote, clone it into our local filesystem: this
475   # ensures "git archive" will work, and is necessary to reliably
476   # resolve a symbolic script_version like "master^".
477   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
478
479   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
480
481   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
482
483   # If we're running under crunch-dispatch, it will have already
484   # pulled the appropriate source tree into its own repository, and
485   # given us that repo's path as $git_dir.
486   #
487   # If we're running a "local" job, we might have to fetch content
488   # from a remote repository.
489   #
490   # (Currently crunch-dispatch gives a local path with --git-dir, but
491   # we might as well accept URLs there too in case it changes its
492   # mind.)
493   my $repo = $git_dir || $Job->{'repository'};
494
495   # Repository can be remote or local. If remote, we'll need to fetch it
496   # to a local dir before doing `git log` et al.
497   my $repo_location;
498
499   if ($repo =~ m{://|^[^/]*:}) {
500     # $repo is a git url we can clone, like git:// or https:// or
501     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
502     # not recognized here because distinguishing that from a local
503     # path is too fragile. If you really need something strange here,
504     # use the ssh:// form.
505     $repo_location = 'remote';
506   } elsif ($repo =~ m{^\.*/}) {
507     # $repo is a local path to a git index. We'll also resolve ../foo
508     # to ../foo/.git if the latter is a directory. To help
509     # disambiguate local paths from named hosted repositories, this
510     # form must be given as ./ or ../ if it's a relative path.
511     if (-d "$repo/.git") {
512       $repo = "$repo/.git";
513     }
514     $repo_location = 'local';
515   } else {
516     # $repo is none of the above. It must be the name of a hosted
517     # repository.
518     my $arv_repo_list = api_call("repositories/list",
519                                  'filters' => [['name','=',$repo]]);
520     my @repos_found = @{$arv_repo_list->{'items'}};
521     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
522     if ($n_found > 0) {
523       Log(undef, "Repository '$repo' -> "
524           . join(", ", map { $_->{'uuid'} } @repos_found));
525     }
526     if ($n_found != 1) {
527       croak("Error: Found $n_found repositories with name '$repo'.");
528     }
529     $repo = $repos_found[0]->{'fetch_url'};
530     $repo_location = 'remote';
531   }
532   Log(undef, "Using $repo_location repository '$repo'");
533   $ENV{"CRUNCH_SRC_URL"} = $repo;
534
535   # Resolve given script_version (we'll call that $treeish here) to a
536   # commit sha1 ($commit).
537   my $treeish = $Job->{'script_version'};
538   my $commit;
539   if ($repo_location eq 'remote') {
540     # We minimize excess object-fetching by re-using the same bare
541     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
542     # just keep adding remotes to it as needed.
543     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
544     my $gitcmd = "git --git-dir=\Q$local_repo\E";
545
546     # Set up our local repo for caching remote objects, making
547     # archives, etc.
548     if (!-d $local_repo) {
549       make_path($local_repo) or croak("Error: could not create $local_repo");
550     }
551     # This works (exits 0 and doesn't delete fetched objects) even
552     # if $local_repo is already initialized:
553     `$gitcmd init --bare`;
554     if ($?) {
555       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
556     }
557
558     # If $treeish looks like a hash (or abbrev hash) we look it up in
559     # our local cache first, since that's cheaper. (We don't want to
560     # do that with tags/branches though -- those change over time, so
561     # they should always be resolved by the remote repo.)
562     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
563       # Hide stderr because it's normal for this to fail:
564       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
565       if ($? == 0 &&
566           # Careful not to resolve a branch named abcdeff to commit 1234567:
567           $sha1 =~ /^$treeish/ &&
568           $sha1 =~ /^([0-9a-f]{40})$/s) {
569         $commit = $1;
570         Log(undef, "Commit $commit already present in $local_repo");
571       }
572     }
573
574     if (!defined $commit) {
575       # If $treeish isn't just a hash or abbrev hash, or isn't here
576       # yet, we need to fetch the remote to resolve it correctly.
577
578       # First, remove all local heads. This prevents a name that does
579       # not exist on the remote from resolving to (or colliding with)
580       # a previously fetched branch or tag (possibly from a different
581       # remote).
582       remove_tree("$local_repo/refs/heads", {keep_root => 1});
583
584       Log(undef, "Fetching objects from $repo to $local_repo");
585       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
586       if ($?) {
587         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
588       }
589     }
590
591     # Now that the data is all here, we will use our local repo for
592     # the rest of our git activities.
593     $repo = $local_repo;
594   }
595
596   my $gitcmd = "git --git-dir=\Q$repo\E";
597   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
598   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
599     croak("`$gitcmd rev-list` exited "
600           .exit_status_s($?)
601           .", '$treeish' not found, giving up");
602   }
603   $commit = $1;
604   Log(undef, "Version $treeish is commit $commit");
605
606   if ($commit ne $Job->{'script_version'}) {
607     # Record the real commit id in the database, frozentokey, logs,
608     # etc. -- instead of an abbreviation or a branch name which can
609     # become ambiguous or point to a different commit in the future.
610     if (!$Job->update_attributes('script_version' => $commit)) {
611       croak("Error: failed to update job's script_version attribute");
612     }
613   }
614
615   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
616   add_git_archive("$gitcmd archive ''\Q$commit\E");
617 }
618
619 my $git_archive = combined_git_archive();
620 if (!defined $git_archive) {
621   Log(undef, "Skip install phase (no git archive)");
622   if ($have_slurm) {
623     Log(undef, "Warning: This probably means workers have no source tree!");
624   }
625 }
626 else {
627   my $install_exited;
628   my $install_script_tries_left = 3;
629   for (my $attempts = 0; $attempts < 3; $attempts++) {
630     Log(undef, "Run install script on all workers");
631
632     my @srunargs = ("srun",
633                     "--nodelist=$nodelist",
634                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
635     my @execargs = ("sh", "-c",
636                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
637
638     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
639     my ($install_stderr_r, $install_stderr_w);
640     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
641     set_nonblocking($install_stderr_r);
642     my $installpid = fork();
643     if ($installpid == 0)
644     {
645       close($install_stderr_r);
646       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
647       open(STDOUT, ">&", $install_stderr_w);
648       open(STDERR, ">&", $install_stderr_w);
649       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
650       exit (1);
651     }
652     close($install_stderr_w);
653     # Tell freeze_if_want_freeze how to kill the child, otherwise the
654     # "waitpid(installpid)" loop won't get interrupted by a freeze:
655     $proc{$installpid} = {};
656     my $stderr_buf = '';
657     # Track whether anything appears on stderr other than slurm errors
658     # ("srun: ...") and the "starting: ..." message printed by the
659     # srun subroutine itself:
660     my $stderr_anything_from_script = 0;
661     my $match_our_own_errors = '^(srun: error: |starting: \[)';
662     while ($installpid != waitpid(-1, WNOHANG)) {
663       freeze_if_want_freeze ($installpid);
664       # Wait up to 0.1 seconds for something to appear on stderr, then
665       # do a non-blocking read.
666       my $bits = fhbits($install_stderr_r);
667       select ($bits, undef, $bits, 0.1);
668       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
669       {
670         while ($stderr_buf =~ /^(.*?)\n/) {
671           my $line = $1;
672           substr $stderr_buf, 0, 1+length($line), "";
673           Log(undef, "stderr $line");
674           if ($line !~ /$match_our_own_errors/) {
675             $stderr_anything_from_script = 1;
676           }
677         }
678       }
679     }
680     delete $proc{$installpid};
681     $install_exited = $?;
682     close($install_stderr_r);
683     if (length($stderr_buf) > 0) {
684       if ($stderr_buf !~ /$match_our_own_errors/) {
685         $stderr_anything_from_script = 1;
686       }
687       Log(undef, "stderr $stderr_buf")
688     }
689
690     Log (undef, "Install script exited ".exit_status_s($install_exited));
691     last if $install_exited == 0 || $main::please_freeze;
692     # If the install script fails but doesn't print an error message,
693     # the next thing anyone is likely to do is just run it again in
694     # case it was a transient problem like "slurm communication fails
695     # because the network isn't reliable enough". So we'll just do
696     # that ourselves (up to 3 attempts in total). OTOH, if there is an
697     # error message, the problem is more likely to have a real fix and
698     # we should fail the job so the fixing process can start, instead
699     # of doing 2 more attempts.
700     last if $stderr_anything_from_script;
701   }
702
703   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
704     unlink($tar_filename);
705   }
706
707   if ($install_exited != 0) {
708     croak("Giving up");
709   }
710 }
711
712 foreach (qw (script script_version script_parameters runtime_constraints))
713 {
714   Log (undef,
715        "$_ " .
716        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
717 }
718 foreach (split (/\n/, $Job->{knobs}))
719 {
720   Log (undef, "knob " . $_);
721 }
722
723
724
725 $main::success = undef;
726
727
728
729 ONELEVEL:
730
731 my $thisround_succeeded = 0;
732 my $thisround_failed = 0;
733 my $thisround_failed_multiple = 0;
734 my $working_slot_count = scalar(@slot);
735
736 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
737                        or $a <=> $b } @jobstep_todo;
738 my $level = $jobstep[$jobstep_todo[0]]->{level};
739
740 my $initial_tasks_this_level = 0;
741 foreach my $id (@jobstep_todo) {
742   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
743 }
744
745 # If the number of tasks scheduled at this level #T is smaller than the number
746 # of slots available #S, only use the first #T slots, or the first slot on
747 # each node, whichever number is greater.
748 #
749 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
750 # based on these numbers.  Using fewer slots makes more resources available
751 # to each individual task, which should normally be a better strategy when
752 # there are fewer of them running with less parallelism.
753 #
754 # Note that this calculation is not redone if the initial tasks at
755 # this level queue more tasks at the same level.  This may harm
756 # overall task throughput for that level.
757 my @freeslot;
758 if ($initial_tasks_this_level < @node) {
759   @freeslot = (0..$#node);
760 } elsif ($initial_tasks_this_level < @slot) {
761   @freeslot = (0..$initial_tasks_this_level - 1);
762 } else {
763   @freeslot = (0..$#slot);
764 }
765 my $round_num_freeslots = scalar(@freeslot);
766
767 my %round_max_slots = ();
768 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
769   my $this_slot = $slot[$freeslot[$ii]];
770   my $node_name = $this_slot->{node}->{name};
771   $round_max_slots{$node_name} ||= $this_slot->{cpu};
772   last if (scalar(keys(%round_max_slots)) >= @node);
773 }
774
775 Log(undef, "start level $level with $round_num_freeslots slots");
776 my @holdslot;
777 my %reader;
778 my $progress_is_dirty = 1;
779 my $progress_stats_updated = 0;
780
781 update_progress_stats();
782
783
784 THISROUND:
785 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
786 {
787   my $id = $jobstep_todo[$todo_ptr];
788   my $Jobstep = $jobstep[$id];
789   if ($Jobstep->{level} != $level)
790   {
791     next;
792   }
793
794   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
795   set_nonblocking($reader{$id});
796
797   my $childslot = $freeslot[0];
798   my $childnode = $slot[$childslot]->{node};
799   my $childslotname = join (".",
800                             $slot[$childslot]->{node}->{name},
801                             $slot[$childslot]->{cpu});
802
803   my $childpid = fork();
804   if ($childpid == 0)
805   {
806     $SIG{'INT'} = 'DEFAULT';
807     $SIG{'QUIT'} = 'DEFAULT';
808     $SIG{'TERM'} = 'DEFAULT';
809
810     foreach (values (%reader))
811     {
812       close($_);
813     }
814     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
815     open(STDOUT,">&writer");
816     open(STDERR,">&writer");
817
818     undef $dbh;
819     undef $sth;
820
821     delete $ENV{"GNUPGHOME"};
822     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
823     $ENV{"TASK_QSEQUENCE"} = $id;
824     $ENV{"TASK_SEQUENCE"} = $level;
825     $ENV{"JOB_SCRIPT"} = $Job->{script};
826     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
827       $param =~ tr/a-z/A-Z/;
828       $ENV{"JOB_PARAMETER_$param"} = $value;
829     }
830     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
831     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
832     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
833     $ENV{"HOME"} = $ENV{"TASK_WORK"};
834     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
835     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
836     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
837     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
838
839     $ENV{"GZIP"} = "-n";
840
841     my @srunargs = (
842       "srun",
843       "--nodelist=".$childnode->{name},
844       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
845       "--job-name=$job_id.$id.$$",
846         );
847     my $command =
848         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
849         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
850         ."&& cd $ENV{CRUNCH_TMP} "
851         # These environment variables get used explicitly later in
852         # $command.  No tool is expected to read these values directly.
853         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
854         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
855         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
856         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
857     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
858     if ($docker_hash)
859     {
860       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
861       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
862       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
863       # We only set memory limits if Docker lets us limit both memory and swap.
864       # Memory limits alone have been supported longer, but subprocesses tend
865       # to get SIGKILL if they exceed that without any swap limit set.
866       # See #5642 for additional background.
867       if ($docker_limitmem) {
868         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
869       }
870
871       # The source tree and $destdir directory (which we have
872       # installed on the worker host) are available in the container,
873       # under the same path.
874       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
875       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
876
877       # Currently, we make arv-mount's mount point appear at /keep
878       # inside the container (instead of using the same path as the
879       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
880       # crunch scripts and utilities must not rely on this. They must
881       # use $TASK_KEEPMOUNT.
882       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
883       $ENV{TASK_KEEPMOUNT} = "/keep";
884
885       # TASK_WORK is almost exactly like a docker data volume: it
886       # starts out empty, is writable, and persists until no
887       # containers use it any more. We don't use --volumes-from to
888       # share it with other containers: it is only accessible to this
889       # task, and it goes away when this task stops.
890       #
891       # However, a docker data volume is writable only by root unless
892       # the mount point already happens to exist in the container with
893       # different permissions. Therefore, we [1] assume /tmp already
894       # exists in the image and is writable by the crunch user; [2]
895       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
896       # writable if they are created by docker while setting up the
897       # other --volumes); and [3] create $TASK_WORK inside the
898       # container using $build_script.
899       $command .= "--volume=/tmp ";
900       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
901       $ENV{"HOME"} = $ENV{"TASK_WORK"};
902       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
903
904       # TODO: Share a single JOB_WORK volume across all task
905       # containers on a given worker node, and delete it when the job
906       # ends (and, in case that doesn't work, when the next job
907       # starts).
908       #
909       # For now, use the same approach as TASK_WORK above.
910       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
911
912       while (my ($env_key, $env_val) = each %ENV)
913       {
914         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
915           $command .= "--env=\Q$env_key=$env_val\E ";
916         }
917       }
918       $command .= "--env=\QHOME=$ENV{HOME}\E ";
919       $command .= "\Q$docker_hash\E ";
920       $command .= "stdbuf --output=0 --error=0 ";
921       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
922     } else {
923       # Non-docker run
924       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
925       $command .= "stdbuf --output=0 --error=0 ";
926       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
927     }
928
929     my @execargs = ('bash', '-c', $command);
930     srun (\@srunargs, \@execargs, undef, $build_script);
931     # exec() failed, we assume nothing happened.
932     die "srun() failed on build script\n";
933   }
934   close("writer");
935   if (!defined $childpid)
936   {
937     close $reader{$id};
938     delete $reader{$id};
939     next;
940   }
941   shift @freeslot;
942   $proc{$childpid} = { jobstep => $id,
943                        time => time,
944                        slot => $childslot,
945                        jobstepname => "$job_id.$id.$childpid",
946                      };
947   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
948   $slot[$childslot]->{pid} = $childpid;
949
950   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
951   Log ($id, "child $childpid started on $childslotname");
952   $Jobstep->{starttime} = time;
953   $Jobstep->{node} = $childnode->{name};
954   $Jobstep->{slotindex} = $childslot;
955   delete $Jobstep->{stderr};
956   delete $Jobstep->{finishtime};
957   delete $Jobstep->{tempfail};
958
959   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
960   $Jobstep->{'arvados_task'}->save;
961
962   splice @jobstep_todo, $todo_ptr, 1;
963   --$todo_ptr;
964
965   $progress_is_dirty = 1;
966
967   while (!@freeslot
968          ||
969          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
970   {
971     last THISROUND if $main::please_freeze || defined($main::success);
972     if ($main::please_info)
973     {
974       $main::please_info = 0;
975       freeze();
976       create_output_collection();
977       save_meta(1);
978       update_progress_stats();
979     }
980     my $gotsome
981         = readfrompipes ()
982         + reapchildren ();
983     if (!$gotsome)
984     {
985       check_refresh_wanted();
986       check_squeue();
987       update_progress_stats();
988       select (undef, undef, undef, 0.1);
989     }
990     elsif (time - $progress_stats_updated >= 30)
991     {
992       update_progress_stats();
993     }
994     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
995                                         $_->{node}->{hold_count} < 4 } @slot);
996     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
997         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
998     {
999       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1000           .($thisround_failed+$thisround_succeeded)
1001           .") -- giving up on this round";
1002       Log (undef, $message);
1003       last THISROUND;
1004     }
1005
1006     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1007     for (my $i=$#freeslot; $i>=0; $i--) {
1008       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1009         push @holdslot, (splice @freeslot, $i, 1);
1010       }
1011     }
1012     for (my $i=$#holdslot; $i>=0; $i--) {
1013       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1014         push @freeslot, (splice @holdslot, $i, 1);
1015       }
1016     }
1017
1018     # give up if no nodes are succeeding
1019     if ($working_slot_count < 1) {
1020       Log(undef, "Every node has failed -- giving up");
1021       last THISROUND;
1022     }
1023   }
1024 }
1025
1026
1027 push @freeslot, splice @holdslot;
1028 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1029
1030
1031 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1032 while (%proc)
1033 {
1034   if ($main::please_continue) {
1035     $main::please_continue = 0;
1036     goto THISROUND;
1037   }
1038   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1039   readfrompipes ();
1040   if (!reapchildren())
1041   {
1042     check_refresh_wanted();
1043     check_squeue();
1044     update_progress_stats();
1045     select (undef, undef, undef, 0.1);
1046     killem (keys %proc) if $main::please_freeze;
1047   }
1048 }
1049
1050 update_progress_stats();
1051 freeze_if_want_freeze();
1052
1053
1054 if (!defined $main::success)
1055 {
1056   if (!@jobstep_todo) {
1057     $main::success = 1;
1058   } elsif ($working_slot_count < 1) {
1059     save_output_collection();
1060     save_meta();
1061     exit(EX_RETRY_UNLOCKED);
1062   } elsif ($thisround_succeeded == 0 &&
1063            ($thisround_failed == 0 || $thisround_failed > 4)) {
1064     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1065     Log (undef, $message);
1066     $main::success = 0;
1067   }
1068 }
1069
1070 goto ONELEVEL if !defined $main::success;
1071
1072
1073 release_allocation();
1074 freeze();
1075 my $collated_output = save_output_collection();
1076 Log (undef, "finish");
1077
1078 save_meta();
1079
1080 my $final_state;
1081 if ($collated_output && $main::success) {
1082   $final_state = 'Complete';
1083 } else {
1084   $final_state = 'Failed';
1085 }
1086 $Job->update_attributes('state' => $final_state);
1087
1088 exit (($final_state eq 'Complete') ? 0 : 1);
1089
1090
1091
1092 sub update_progress_stats
1093 {
1094   $progress_stats_updated = time;
1095   return if !$progress_is_dirty;
1096   my ($todo, $done, $running) = (scalar @jobstep_todo,
1097                                  scalar @jobstep_done,
1098                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1099   $Job->{'tasks_summary'} ||= {};
1100   $Job->{'tasks_summary'}->{'todo'} = $todo;
1101   $Job->{'tasks_summary'}->{'done'} = $done;
1102   $Job->{'tasks_summary'}->{'running'} = $running;
1103   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1104   Log (undef, "status: $done done, $running running, $todo todo");
1105   $progress_is_dirty = 0;
1106 }
1107
1108
1109
1110 sub reapchildren
1111 {
1112   my $pid = waitpid (-1, WNOHANG);
1113   return 0 if $pid <= 0;
1114
1115   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1116                   . "."
1117                   . $slot[$proc{$pid}->{slot}]->{cpu});
1118   my $jobstepid = $proc{$pid}->{jobstep};
1119   my $elapsed = time - $proc{$pid}->{time};
1120   my $Jobstep = $jobstep[$jobstepid];
1121
1122   my $childstatus = $?;
1123   my $exitvalue = $childstatus >> 8;
1124   my $exitinfo = "exit ".exit_status_s($childstatus);
1125   $Jobstep->{'arvados_task'}->reload;
1126   my $task_success = $Jobstep->{'arvados_task'}->{success};
1127
1128   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1129
1130   if (!defined $task_success) {
1131     # task did not indicate one way or the other --> fail
1132     $Jobstep->{'arvados_task'}->{success} = 0;
1133     $Jobstep->{'arvados_task'}->save;
1134     $task_success = 0;
1135   }
1136
1137   if (!$task_success)
1138   {
1139     my $temporary_fail;
1140     $temporary_fail ||= $Jobstep->{tempfail};
1141     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1142
1143     ++$thisround_failed;
1144     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1145
1146     # Check for signs of a failed or misconfigured node
1147     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1148         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1149       # Don't count this against jobstep failure thresholds if this
1150       # node is already suspected faulty and srun exited quickly
1151       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1152           $elapsed < 5) {
1153         Log ($jobstepid, "blaming failure on suspect node " .
1154              $slot[$proc{$pid}->{slot}]->{node}->{name});
1155         $temporary_fail ||= 1;
1156       }
1157       ban_node_by_slot($proc{$pid}->{slot});
1158     }
1159
1160     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1161                              ++$Jobstep->{'failures'},
1162                              $temporary_fail ? 'temporary' : 'permanent',
1163                              $elapsed));
1164
1165     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1166       # Give up on this task, and the whole job
1167       $main::success = 0;
1168     }
1169     # Put this task back on the todo queue
1170     push @jobstep_todo, $jobstepid;
1171     $Job->{'tasks_summary'}->{'failed'}++;
1172   }
1173   else
1174   {
1175     ++$thisround_succeeded;
1176     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1177     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1178     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1179     push @jobstep_done, $jobstepid;
1180     Log ($jobstepid, "success in $elapsed seconds");
1181   }
1182   $Jobstep->{exitcode} = $childstatus;
1183   $Jobstep->{finishtime} = time;
1184   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1185   $Jobstep->{'arvados_task'}->save;
1186   process_stderr ($jobstepid, $task_success);
1187   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1188                            length($Jobstep->{'arvados_task'}->{output}),
1189                            $Jobstep->{'arvados_task'}->{output}));
1190
1191   close $reader{$jobstepid};
1192   delete $reader{$jobstepid};
1193   delete $slot[$proc{$pid}->{slot}]->{pid};
1194   push @freeslot, $proc{$pid}->{slot};
1195   delete $proc{$pid};
1196
1197   if ($task_success) {
1198     # Load new tasks
1199     my $newtask_list = [];
1200     my $newtask_results;
1201     do {
1202       $newtask_results = api_call(
1203         "job_tasks/list",
1204         'where' => {
1205           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1206         },
1207         'order' => 'qsequence',
1208         'offset' => scalar(@$newtask_list),
1209       );
1210       push(@$newtask_list, @{$newtask_results->{items}});
1211     } while (@{$newtask_results->{items}});
1212     foreach my $arvados_task (@$newtask_list) {
1213       my $jobstep = {
1214         'level' => $arvados_task->{'sequence'},
1215         'failures' => 0,
1216         'arvados_task' => $arvados_task
1217       };
1218       push @jobstep, $jobstep;
1219       push @jobstep_todo, $#jobstep;
1220     }
1221   }
1222
1223   $progress_is_dirty = 1;
1224   1;
1225 }
1226
1227 sub check_refresh_wanted
1228 {
1229   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1230   if (@stat && $stat[9] > $latest_refresh) {
1231     $latest_refresh = scalar time;
1232     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1233     for my $attr ('cancelled_at',
1234                   'cancelled_by_user_uuid',
1235                   'cancelled_by_client_uuid',
1236                   'state') {
1237       $Job->{$attr} = $Job2->{$attr};
1238     }
1239     if ($Job->{'state'} ne "Running") {
1240       if ($Job->{'state'} eq "Cancelled") {
1241         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1242       } else {
1243         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1244       }
1245       $main::success = 0;
1246       $main::please_freeze = 1;
1247     }
1248   }
1249 }
1250
1251 sub check_squeue
1252 {
1253   my $last_squeue_check = $squeue_checked;
1254
1255   # Do not call `squeue` or check the kill list more than once every
1256   # 15 seconds.
1257   return if $last_squeue_check > time - 15;
1258   $squeue_checked = time;
1259
1260   # Look for children from which we haven't received stderr data since
1261   # the last squeue check. If no such children exist, all procs are
1262   # alive and there's no need to even look at squeue.
1263   #
1264   # As long as the crunchstat poll interval (10s) is shorter than the
1265   # squeue check interval (15s) this should make the squeue check an
1266   # infrequent event.
1267   my $silent_procs = 0;
1268   for my $jobstep (values %proc)
1269   {
1270     if ($jobstep->{stderr_at} < $last_squeue_check)
1271     {
1272       $silent_procs++;
1273     }
1274   }
1275   return if $silent_procs == 0;
1276
1277   # use killem() on procs whose killtime is reached
1278   while (my ($pid, $jobstep) = each %proc)
1279   {
1280     if (exists $jobstep->{killtime}
1281         && $jobstep->{killtime} <= time
1282         && $jobstep->{stderr_at} < $last_squeue_check)
1283     {
1284       my $sincewhen = "";
1285       if ($jobstep->{stderr_at}) {
1286         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1287       }
1288       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1289       killem ($pid);
1290     }
1291   }
1292
1293   if (!$have_slurm)
1294   {
1295     # here is an opportunity to check for mysterious problems with local procs
1296     return;
1297   }
1298
1299   # Get a list of steps still running.  Note: squeue(1) says --steps
1300   # selects a format (which we override anyway) and allows us to
1301   # specify which steps we're interested in (which we don't).
1302   # Importantly, it also changes the meaning of %j from "job name" to
1303   # "step name" and (although this isn't mentioned explicitly in the
1304   # docs) switches from "one line per job" mode to "one line per step"
1305   # mode. Without it, we'd just get a list of one job, instead of a
1306   # list of N steps.
1307   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1308   if ($? != 0)
1309   {
1310     Log(undef, "warning: squeue exit status $? ($!)");
1311     return;
1312   }
1313   chop @squeue;
1314
1315   # which of my jobsteps are running, according to squeue?
1316   my %ok;
1317   for my $jobstepname (@squeue)
1318   {
1319     $ok{$jobstepname} = 1;
1320   }
1321
1322   # Check for child procs >60s old and not mentioned by squeue.
1323   while (my ($pid, $jobstep) = each %proc)
1324   {
1325     if ($jobstep->{time} < time - 60
1326         && $jobstep->{jobstepname}
1327         && !exists $ok{$jobstep->{jobstepname}}
1328         && !exists $jobstep->{killtime})
1329     {
1330       # According to slurm, this task has ended (successfully or not)
1331       # -- but our srun child hasn't exited. First we must wait (30
1332       # seconds) in case this is just a race between communication
1333       # channels. Then, if our srun child process still hasn't
1334       # terminated, we'll conclude some slurm communication
1335       # error/delay has caused the task to die without notifying srun,
1336       # and we'll kill srun ourselves.
1337       $jobstep->{killtime} = time + 30;
1338       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1339     }
1340   }
1341 }
1342
1343
1344 sub release_allocation
1345 {
1346   if ($have_slurm)
1347   {
1348     Log (undef, "release job allocation");
1349     system "scancel $ENV{SLURM_JOB_ID}";
1350   }
1351 }
1352
1353
1354 sub readfrompipes
1355 {
1356   my $gotsome = 0;
1357   foreach my $job (keys %reader)
1358   {
1359     my $buf;
1360     while (0 < sysread ($reader{$job}, $buf, 8192))
1361     {
1362       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1363       $jobstep[$job]->{stderr_at} = time;
1364       $jobstep[$job]->{stderr} .= $buf;
1365       preprocess_stderr ($job);
1366       if (length ($jobstep[$job]->{stderr}) > 16384)
1367       {
1368         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1369       }
1370       $gotsome = 1;
1371     }
1372   }
1373   return $gotsome;
1374 }
1375
1376
1377 sub preprocess_stderr
1378 {
1379   my $job = shift;
1380
1381   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1382     my $line = $1;
1383     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1384     Log ($job, "stderr $line");
1385     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1386       # whoa.
1387       $main::please_freeze = 1;
1388     }
1389     elsif ($line =~ /srun: error: Node failure on/) {
1390       my $job_slot_index = $jobstep[$job]->{slotindex};
1391       $slot[$job_slot_index]->{node}->{fail_count}++;
1392       $jobstep[$job]->{tempfail} = 1;
1393       ban_node_by_slot($job_slot_index);
1394     }
1395     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1396       $jobstep[$job]->{tempfail} = 1;
1397       ban_node_by_slot($jobstep[$job]->{slotindex});
1398     }
1399     elsif ($line =~ /arvados\.errors\.Keep/) {
1400       $jobstep[$job]->{tempfail} = 1;
1401     }
1402   }
1403 }
1404
1405
1406 sub process_stderr
1407 {
1408   my $job = shift;
1409   my $task_success = shift;
1410   preprocess_stderr ($job);
1411
1412   map {
1413     Log ($job, "stderr $_");
1414   } split ("\n", $jobstep[$job]->{stderr});
1415 }
1416
1417 sub fetch_block
1418 {
1419   my $hash = shift;
1420   my $keep;
1421   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1422     Log(undef, "fetch_block run error from arv-get $hash: $!");
1423     return undef;
1424   }
1425   my $output_block = "";
1426   while (1) {
1427     my $buf;
1428     my $bytes = sysread($keep, $buf, 1024 * 1024);
1429     if (!defined $bytes) {
1430       Log(undef, "fetch_block read error from arv-get: $!");
1431       $output_block = undef;
1432       last;
1433     } elsif ($bytes == 0) {
1434       # sysread returns 0 at the end of the pipe.
1435       last;
1436     } else {
1437       # some bytes were read into buf.
1438       $output_block .= $buf;
1439     }
1440   }
1441   close $keep;
1442   if ($?) {
1443     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1444     $output_block = undef;
1445   }
1446   return $output_block;
1447 }
1448
1449 # Create a collection by concatenating the output of all tasks (each
1450 # task's output is either a manifest fragment, a locator for a
1451 # manifest fragment stored in Keep, or nothing at all). Return the
1452 # portable_data_hash of the new collection.
1453 sub create_output_collection
1454 {
1455   Log (undef, "collate");
1456
1457   my ($child_out, $child_in);
1458   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1459 import arvados
1460 import sys
1461 print (arvados.api("v1").collections().
1462        create(body={"manifest_text": sys.stdin.read()}).
1463        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1464 }, retry_count());
1465
1466   my $task_idx = -1;
1467   my $manifest_size = 0;
1468   for (@jobstep)
1469   {
1470     ++$task_idx;
1471     my $output = $_->{'arvados_task'}->{output};
1472     next if (!defined($output));
1473     my $next_write;
1474     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1475       $next_write = fetch_block($output);
1476     } else {
1477       $next_write = $output;
1478     }
1479     if (defined($next_write)) {
1480       if (!defined(syswrite($child_in, $next_write))) {
1481         # There's been an error writing.  Stop the loop.
1482         # We'll log details about the exit code later.
1483         last;
1484       } else {
1485         $manifest_size += length($next_write);
1486       }
1487     } else {
1488       my $uuid = $_->{'arvados_task'}->{'uuid'};
1489       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1490       $main::success = 0;
1491     }
1492   }
1493   close($child_in);
1494   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1495
1496   my $joboutput;
1497   my $s = IO::Select->new($child_out);
1498   if ($s->can_read(120)) {
1499     sysread($child_out, $joboutput, 1024 * 1024);
1500     waitpid($pid, 0);
1501     if ($?) {
1502       Log(undef, "output collection creation exited " . exit_status_s($?));
1503       $joboutput = undef;
1504     } else {
1505       chomp($joboutput);
1506     }
1507   } else {
1508     Log (undef, "timed out while creating output collection");
1509     foreach my $signal (2, 2, 2, 15, 15, 9) {
1510       kill($signal, $pid);
1511       last if waitpid($pid, WNOHANG) == -1;
1512       sleep(1);
1513     }
1514   }
1515   close($child_out);
1516
1517   return $joboutput;
1518 }
1519
1520 # Calls create_output_collection, logs the result, and returns it.
1521 # If that was successful, save that as the output in the job record.
1522 sub save_output_collection {
1523   my $collated_output = create_output_collection();
1524
1525   if (!$collated_output) {
1526     Log(undef, "Failed to write output collection");
1527   }
1528   else {
1529     Log(undef, "job output $collated_output");
1530     $Job->update_attributes('output' => $collated_output);
1531   }
1532   return $collated_output;
1533 }
1534
1535 sub killem
1536 {
1537   foreach (@_)
1538   {
1539     my $sig = 2;                # SIGINT first
1540     if (exists $proc{$_}->{"sent_$sig"} &&
1541         time - $proc{$_}->{"sent_$sig"} > 4)
1542     {
1543       $sig = 15;                # SIGTERM if SIGINT doesn't work
1544     }
1545     if (exists $proc{$_}->{"sent_$sig"} &&
1546         time - $proc{$_}->{"sent_$sig"} > 4)
1547     {
1548       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1549     }
1550     if (!exists $proc{$_}->{"sent_$sig"})
1551     {
1552       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1553       kill $sig, $_;
1554       select (undef, undef, undef, 0.1);
1555       if ($sig == 2)
1556       {
1557         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1558       }
1559       $proc{$_}->{"sent_$sig"} = time;
1560       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1561     }
1562   }
1563 }
1564
1565
1566 sub fhbits
1567 {
1568   my($bits);
1569   for (@_) {
1570     vec($bits,fileno($_),1) = 1;
1571   }
1572   $bits;
1573 }
1574
1575
1576 # Send log output to Keep via arv-put.
1577 #
1578 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1579 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1580 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1581 # $log_pipe_pid is the pid of the arv-put subprocess.
1582 #
1583 # The only functions that should access these variables directly are:
1584 #
1585 # log_writer_start($logfilename)
1586 #     Starts an arv-put pipe, reading data on stdin and writing it to
1587 #     a $logfilename file in an output collection.
1588 #
1589 # log_writer_read_output([$timeout])
1590 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1591 #     Passes $timeout to the select() call, with a default of 0.01.
1592 #     Returns the result of the last read() call on $log_pipe_out, or
1593 #     -1 if read() wasn't called because select() timed out.
1594 #     Only other log_writer_* functions should need to call this.
1595 #
1596 # log_writer_send($txt)
1597 #     Writes $txt to the output log collection.
1598 #
1599 # log_writer_finish()
1600 #     Closes the arv-put pipe and returns the output that it produces.
1601 #
1602 # log_writer_is_active()
1603 #     Returns a true value if there is currently a live arv-put
1604 #     process, false otherwise.
1605 #
1606 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1607     $log_pipe_pid);
1608
1609 sub log_writer_start($)
1610 {
1611   my $logfilename = shift;
1612   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1613                         'arv-put',
1614                         '--stream',
1615                         '--retries', '3',
1616                         '--filename', $logfilename,
1617                         '-');
1618   $log_pipe_out_buf = "";
1619   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1620 }
1621
1622 sub log_writer_read_output {
1623   my $timeout = shift || 0.01;
1624   my $read = -1;
1625   while ($read && $log_pipe_out_select->can_read($timeout)) {
1626     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1627                  length($log_pipe_out_buf));
1628   }
1629   if (!defined($read)) {
1630     Log(undef, "error reading log manifest from arv-put: $!");
1631   }
1632   return $read;
1633 }
1634
1635 sub log_writer_send($)
1636 {
1637   my $txt = shift;
1638   print $log_pipe_in $txt;
1639   log_writer_read_output();
1640 }
1641
1642 sub log_writer_finish()
1643 {
1644   return unless $log_pipe_pid;
1645
1646   close($log_pipe_in);
1647
1648   my $read_result = log_writer_read_output(120);
1649   if ($read_result == -1) {
1650     Log (undef, "timed out reading from 'arv-put'");
1651   } elsif ($read_result != 0) {
1652     Log(undef, "failed to read arv-put log manifest to EOF");
1653   }
1654
1655   waitpid($log_pipe_pid, 0);
1656   if ($?) {
1657     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1658   }
1659
1660   close($log_pipe_out);
1661   my $arv_put_output = $log_pipe_out_buf;
1662   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1663       $log_pipe_out_select = undef;
1664
1665   return $arv_put_output;
1666 }
1667
1668 sub log_writer_is_active() {
1669   return $log_pipe_pid;
1670 }
1671
1672 sub Log                         # ($jobstep_id, $logmessage)
1673 {
1674   if ($_[1] =~ /\n/) {
1675     for my $line (split (/\n/, $_[1])) {
1676       Log ($_[0], $line);
1677     }
1678     return;
1679   }
1680   my $fh = select STDERR; $|=1; select $fh;
1681   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1682   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1683   $message .= "\n";
1684   my $datetime;
1685   if (log_writer_is_active() || -t STDERR) {
1686     my @gmtime = gmtime;
1687     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1688                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1689   }
1690   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1691
1692   if (log_writer_is_active()) {
1693     log_writer_send($datetime . " " . $message);
1694   }
1695 }
1696
1697
1698 sub croak
1699 {
1700   my ($package, $file, $line) = caller;
1701   my $message = "@_ at $file line $line\n";
1702   Log (undef, $message);
1703   freeze() if @jobstep_todo;
1704   create_output_collection() if @jobstep_todo;
1705   cleanup();
1706   save_meta();
1707   die;
1708 }
1709
1710
1711 sub cleanup
1712 {
1713   return unless $Job;
1714   if ($Job->{'state'} eq 'Cancelled') {
1715     $Job->update_attributes('finished_at' => scalar gmtime);
1716   } else {
1717     $Job->update_attributes('state' => 'Failed');
1718   }
1719 }
1720
1721
1722 sub save_meta
1723 {
1724   my $justcheckpoint = shift; # false if this will be the last meta saved
1725   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1726   return unless log_writer_is_active();
1727
1728   my $log_manifest = "";
1729   if ($Job->{log}) {
1730     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1731     $log_manifest .= $prev_log_coll->{manifest_text};
1732   }
1733   $log_manifest .= log_writer_finish();
1734
1735   my $log_coll = api_call(
1736     "collections/create", ensure_unique_name => 1, collection => {
1737       manifest_text => $log_manifest,
1738       owner_uuid => $Job->{owner_uuid},
1739       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1740     });
1741   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1742   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1743 }
1744
1745
1746 sub freeze_if_want_freeze
1747 {
1748   if ($main::please_freeze)
1749   {
1750     release_allocation();
1751     if (@_)
1752     {
1753       # kill some srun procs before freeze+stop
1754       map { $proc{$_} = {} } @_;
1755       while (%proc)
1756       {
1757         killem (keys %proc);
1758         select (undef, undef, undef, 0.1);
1759         my $died;
1760         while (($died = waitpid (-1, WNOHANG)) > 0)
1761         {
1762           delete $proc{$died};
1763         }
1764       }
1765     }
1766     freeze();
1767     create_output_collection();
1768     cleanup();
1769     save_meta();
1770     exit 1;
1771   }
1772 }
1773
1774
1775 sub freeze
1776 {
1777   Log (undef, "Freeze not implemented");
1778   return;
1779 }
1780
1781
1782 sub thaw
1783 {
1784   croak ("Thaw not implemented");
1785 }
1786
1787
1788 sub freezequote
1789 {
1790   my $s = shift;
1791   $s =~ s/\\/\\\\/g;
1792   $s =~ s/\n/\\n/g;
1793   return $s;
1794 }
1795
1796
1797 sub freezeunquote
1798 {
1799   my $s = shift;
1800   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1801   return $s;
1802 }
1803
1804
1805 sub srun
1806 {
1807   my $srunargs = shift;
1808   my $execargs = shift;
1809   my $opts = shift || {};
1810   my $stdin = shift;
1811   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1812
1813   $Data::Dumper::Terse = 1;
1814   $Data::Dumper::Indent = 0;
1815   my $show_cmd = Dumper($args);
1816   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1817   $show_cmd =~ s/\n/ /g;
1818   if ($opts->{fork}) {
1819     Log(undef, "starting: $show_cmd");
1820   } else {
1821     # This is a child process: parent is in charge of reading our
1822     # stderr and copying it to Log() if needed.
1823     warn "starting: $show_cmd\n";
1824   }
1825
1826   if (defined $stdin) {
1827     my $child = open STDIN, "-|";
1828     defined $child or die "no fork: $!";
1829     if ($child == 0) {
1830       print $stdin or die $!;
1831       close STDOUT or die $!;
1832       exit 0;
1833     }
1834   }
1835
1836   return system (@$args) if $opts->{fork};
1837
1838   exec @$args;
1839   warn "ENV size is ".length(join(" ",%ENV));
1840   die "exec failed: $!: @$args";
1841 }
1842
1843
1844 sub ban_node_by_slot {
1845   # Don't start any new jobsteps on this node for 60 seconds
1846   my $slotid = shift;
1847   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1848   $slot[$slotid]->{node}->{hold_count}++;
1849   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1850 }
1851
1852 sub must_lock_now
1853 {
1854   my ($lockfile, $error_message) = @_;
1855   open L, ">", $lockfile or croak("$lockfile: $!");
1856   if (!flock L, LOCK_EX|LOCK_NB) {
1857     croak("Can't lock $lockfile: $error_message\n");
1858   }
1859 }
1860
1861 sub find_docker_image {
1862   # Given a Keep locator, check to see if it contains a Docker image.
1863   # If so, return its stream name and Docker hash.
1864   # If not, return undef for both values.
1865   my $locator = shift;
1866   my ($streamname, $filename);
1867   my $image = api_call("collections/get", uuid => $locator);
1868   if ($image) {
1869     foreach my $line (split(/\n/, $image->{manifest_text})) {
1870       my @tokens = split(/\s+/, $line);
1871       next if (!@tokens);
1872       $streamname = shift(@tokens);
1873       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1874         if (defined($filename)) {
1875           return (undef, undef);  # More than one file in the Collection.
1876         } else {
1877           $filename = (split(/:/, $filedata, 3))[2];
1878         }
1879       }
1880     }
1881   }
1882   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1883     return ($streamname, $1);
1884   } else {
1885     return (undef, undef);
1886   }
1887 }
1888
1889 sub retry_count {
1890   # Calculate the number of times an operation should be retried,
1891   # assuming exponential backoff, and that we're willing to retry as
1892   # long as tasks have been running.  Enforce a minimum of 3 retries.
1893   my ($starttime, $endtime, $timediff, $retries);
1894   if (@jobstep) {
1895     $starttime = $jobstep[0]->{starttime};
1896     $endtime = $jobstep[-1]->{finishtime};
1897   }
1898   if (!defined($starttime)) {
1899     $timediff = 0;
1900   } elsif (!defined($endtime)) {
1901     $timediff = time - $starttime;
1902   } else {
1903     $timediff = ($endtime - $starttime) - (time - $endtime);
1904   }
1905   if ($timediff > 0) {
1906     $retries = int(log($timediff) / log(2));
1907   } else {
1908     $retries = 1;  # Use the minimum.
1909   }
1910   return ($retries > 3) ? $retries : 3;
1911 }
1912
1913 sub retry_op {
1914   # Pass in two function references.
1915   # This method will be called with the remaining arguments.
1916   # If it dies, retry it with exponential backoff until it succeeds,
1917   # or until the current retry_count is exhausted.  After each failure
1918   # that can be retried, the second function will be called with
1919   # the current try count (0-based), next try time, and error message.
1920   my $operation = shift;
1921   my $retry_callback = shift;
1922   my $retries = retry_count();
1923   foreach my $try_count (0..$retries) {
1924     my $next_try = time + (2 ** $try_count);
1925     my $result = eval { $operation->(@_); };
1926     if (!$@) {
1927       return $result;
1928     } elsif ($try_count < $retries) {
1929       $retry_callback->($try_count, $next_try, $@);
1930       my $sleep_time = $next_try - time;
1931       sleep($sleep_time) if ($sleep_time > 0);
1932     }
1933   }
1934   # Ensure the error message ends in a newline, so Perl doesn't add
1935   # retry_op's line number to it.
1936   chomp($@);
1937   die($@ . "\n");
1938 }
1939
1940 sub api_call {
1941   # Pass in a /-separated API method name, and arguments for it.
1942   # This function will call that method, retrying as needed until
1943   # the current retry_count is exhausted, with a log on the first failure.
1944   my $method_name = shift;
1945   my $log_api_retry = sub {
1946     my ($try_count, $next_try_at, $errmsg) = @_;
1947     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1948     $errmsg =~ s/\s/ /g;
1949     $errmsg =~ s/\s+$//;
1950     my $retry_msg;
1951     if ($next_try_at < time) {
1952       $retry_msg = "Retrying.";
1953     } else {
1954       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1955       $retry_msg = "Retrying at $next_try_fmt.";
1956     }
1957     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1958   };
1959   my $method = $arv;
1960   foreach my $key (split(/\//, $method_name)) {
1961     $method = $method->{$key};
1962   }
1963   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1964 }
1965
1966 sub exit_status_s {
1967   # Given a $?, return a human-readable exit code string like "0" or
1968   # "1" or "0 with signal 1" or "1 with signal 11".
1969   my $exitcode = shift;
1970   my $s = $exitcode >> 8;
1971   if ($exitcode & 0x7f) {
1972     $s .= " with signal " . ($exitcode & 0x7f);
1973   }
1974   if ($exitcode & 0x80) {
1975     $s .= " with core dump";
1976   }
1977   return $s;
1978 }
1979
1980 sub handle_readall {
1981   # Pass in a glob reference to a file handle.
1982   # Read all its contents and return them as a string.
1983   my $fh_glob_ref = shift;
1984   local $/ = undef;
1985   return <$fh_glob_ref>;
1986 }
1987
1988 sub tar_filename_n {
1989   my $n = shift;
1990   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1991 }
1992
1993 sub add_git_archive {
1994   # Pass in a git archive command as a string or list, a la system().
1995   # This method will save its output to be included in the archive sent to the
1996   # build script.
1997   my $git_input;
1998   $git_tar_count++;
1999   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2000     croak("Failed to save git archive: $!");
2001   }
2002   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2003   close($git_input);
2004   waitpid($git_pid, 0);
2005   close(GIT_ARCHIVE);
2006   if ($?) {
2007     croak("Failed to save git archive: git exited " . exit_status_s($?));
2008   }
2009 }
2010
2011 sub combined_git_archive {
2012   # Combine all saved tar archives into a single archive, then return its
2013   # contents in a string.  Return undef if no archives have been saved.
2014   if ($git_tar_count < 1) {
2015     return undef;
2016   }
2017   my $base_tar_name = tar_filename_n(1);
2018   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2019     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2020     if ($tar_exit != 0) {
2021       croak("Error preparing build archive: tar -A exited " .
2022             exit_status_s($tar_exit));
2023     }
2024   }
2025   if (!open(GIT_TAR, "<", $base_tar_name)) {
2026     croak("Could not open build archive: $!");
2027   }
2028   my $tar_contents = handle_readall(\*GIT_TAR);
2029   close(GIT_TAR);
2030   return $tar_contents;
2031 }
2032
2033 sub set_nonblocking {
2034   my $fh = shift;
2035   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2036   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2037 }
2038
2039 __DATA__
2040 #!/usr/bin/env perl
2041 #
2042 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2043 # server invokes this script on individual compute nodes, or localhost if we're
2044 # running a job locally.  It gets called in two modes:
2045 #
2046 # * No arguments: Installation mode.  Read a tar archive from the DATA
2047 #   file handle; it includes the Crunch script's source code, and
2048 #   maybe SDKs as well.  Those should be installed in the proper
2049 #   locations.  This runs outside of any Docker container, so don't try to
2050 #   introspect Crunch's runtime environment.
2051 #
2052 # * With arguments: Crunch script run mode.  This script should set up the
2053 #   environment, then run the command specified in the arguments.  This runs
2054 #   inside any Docker container.
2055
2056 use Fcntl ':flock';
2057 use File::Path qw( make_path remove_tree );
2058 use POSIX qw(getcwd);
2059
2060 use constant TASK_TEMPFAIL => 111;
2061
2062 # Map SDK subdirectories to the path environments they belong to.
2063 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2064
2065 my $destdir = $ENV{"CRUNCH_SRC"};
2066 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2067 my $repo = $ENV{"CRUNCH_SRC_URL"};
2068 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2069 my $job_work = $ENV{"JOB_WORK"};
2070 my $task_work = $ENV{"TASK_WORK"};
2071
2072 open(STDOUT_ORIG, ">&", STDOUT);
2073 open(STDERR_ORIG, ">&", STDERR);
2074
2075 for my $dir ($destdir, $job_work, $task_work) {
2076   if ($dir) {
2077     make_path $dir;
2078     -e $dir or die "Failed to create temporary directory ($dir): $!";
2079   }
2080 }
2081
2082 if ($task_work) {
2083   remove_tree($task_work, {keep_root => 1});
2084 }
2085
2086 ### Crunch script run mode
2087 if (@ARGV) {
2088   # We want to do routine logging during task 0 only.  This gives the user
2089   # the information they need, but avoids repeating the information for every
2090   # task.
2091   my $Log;
2092   if ($ENV{TASK_SEQUENCE} eq "0") {
2093     $Log = sub {
2094       my $msg = shift;
2095       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2096     };
2097   } else {
2098     $Log = sub { };
2099   }
2100
2101   my $python_src = "$install_dir/python";
2102   my $venv_dir = "$job_work/.arvados.venv";
2103   my $venv_built = -e "$venv_dir/bin/activate";
2104   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2105     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2106                  "--python=python2.7", $venv_dir);
2107     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2108     $venv_built = 1;
2109     $Log->("Built Python SDK virtualenv");
2110   }
2111
2112   my $pip_bin = "pip";
2113   if ($venv_built) {
2114     $Log->("Running in Python SDK virtualenv");
2115     $pip_bin = "$venv_dir/bin/pip";
2116     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2117     @ARGV = ("/bin/sh", "-ec",
2118              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2119   } elsif (-d $python_src) {
2120     $Log->("Warning: virtualenv not found inside Docker container default " .
2121            "\$PATH. Can't install Python SDK.");
2122   }
2123
2124   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2125   if ($pkgs) {
2126     $Log->("Using Arvados SDK:");
2127     foreach my $line (split /\n/, $pkgs) {
2128       $Log->($line);
2129     }
2130   } else {
2131     $Log->("Arvados SDK packages not found");
2132   }
2133
2134   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2135     my $sdk_path = "$install_dir/$sdk_dir";
2136     if (-d $sdk_path) {
2137       if ($ENV{$sdk_envkey}) {
2138         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2139       } else {
2140         $ENV{$sdk_envkey} = $sdk_path;
2141       }
2142       $Log->("Arvados SDK added to %s", $sdk_envkey);
2143     }
2144   }
2145
2146   exec(@ARGV);
2147   die "Cannot exec `@ARGV`: $!";
2148 }
2149
2150 ### Installation mode
2151 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2152 flock L, LOCK_EX;
2153 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2154   # This exact git archive (source + arvados sdk) is already installed
2155   # here, so there's no need to reinstall it.
2156
2157   # We must consume our DATA section, though: otherwise the process
2158   # feeding it to us will get SIGPIPE.
2159   my $buf;
2160   while (read(DATA, $buf, 65536)) { }
2161
2162   exit(0);
2163 }
2164
2165 unlink "$destdir.archive_hash";
2166 mkdir $destdir;
2167
2168 do {
2169   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2170   local $SIG{PIPE} = "IGNORE";
2171   warn "Extracting archive: $archive_hash\n";
2172   # --ignore-zeros is necessary sometimes: depending on how much NUL
2173   # padding tar -A put on our combined archive (which in turn depends
2174   # on the length of the component archives) tar without
2175   # --ignore-zeros will exit before consuming stdin and cause close()
2176   # to fail on the resulting SIGPIPE.
2177   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2178     die "Error launching 'tar -xC $destdir': $!";
2179   }
2180   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2181   # get SIGPIPE.  We must feed it data incrementally.
2182   my $tar_input;
2183   while (read(DATA, $tar_input, 65536)) {
2184     print TARX $tar_input;
2185   }
2186   if(!close(TARX)) {
2187     die "'tar -xC $destdir' exited $?: $!";
2188   }
2189 };
2190
2191 mkdir $install_dir;
2192
2193 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2194 if (-d $sdk_root) {
2195   foreach my $sdk_lang (("python",
2196                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2197     if (-d "$sdk_root/$sdk_lang") {
2198       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2199         die "Failed to install $sdk_lang SDK: $!";
2200       }
2201     }
2202   }
2203 }
2204
2205 my $python_dir = "$install_dir/python";
2206 if ((-d $python_dir) and can_run("python2.7")) {
2207   open(my $egg_info_pipe, "-|",
2208        "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
2209   my @egg_info_errors = <$egg_info_pipe>;
2210   close($egg_info_pipe);
2211   if ($?) {
2212     if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
2213       # egg_info apparently failed because it couldn't ask git for a build tag.
2214       # Specify no build tag.
2215       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2216       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2217       close($pysdk_cfg);
2218     } else {
2219       my $egg_info_exit = $? >> 8;
2220       foreach my $errline (@egg_info_errors) {
2221         print STDERR_ORIG $errline;
2222       }
2223       warn "python setup.py egg_info failed: exit $egg_info_exit";
2224       exit ($egg_info_exit || 1);
2225     }
2226   }
2227 }
2228
2229 # Hide messages from the install script (unless it fails: shell_or_die
2230 # will show $destdir.log in that case).
2231 open(STDOUT, ">>", "$destdir.log");
2232 open(STDERR, ">&", STDOUT);
2233
2234 if (-e "$destdir/crunch_scripts/install") {
2235     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2236 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2237     # Old version
2238     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2239 } elsif (-e "./install.sh") {
2240     shell_or_die (undef, "./install.sh", $install_dir);
2241 }
2242
2243 if ($archive_hash) {
2244     unlink "$destdir.archive_hash.new";
2245     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2246     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2247 }
2248
2249 close L;
2250
2251 sub can_run {
2252   my $command_name = shift;
2253   open(my $which, "-|", "which", $command_name);
2254   while (<$which>) { }
2255   close($which);
2256   return ($? == 0);
2257 }
2258
2259 sub shell_or_die
2260 {
2261   my $exitcode = shift;
2262
2263   if ($ENV{"DEBUG"}) {
2264     print STDERR "@_\n";
2265   }
2266   if (system (@_) != 0) {
2267     my $err = $!;
2268     my $code = $?;
2269     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2270     open STDERR, ">&STDERR_ORIG";
2271     system ("cat $destdir.log >&2");
2272     warn "@_ failed ($err): $exitstatus";
2273     if (defined($exitcode)) {
2274       exit $exitcode;
2275     }
2276     else {
2277       exit (($code >> 8) || 1);
2278     }
2279   }
2280 }
2281
2282 __DATA__