Merge branch 'master' into 3198-writable-fuse
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my %proc;
122 my $force_unlock;
123 my $git_dir;
124 my $jobspec;
125 my $job_api_token;
126 my $no_clear_tmp;
127 my $resume_stash;
128 my $docker_bin = "/usr/bin/docker.io";
129 GetOptions('force-unlock' => \$force_unlock,
130            'git-dir=s' => \$git_dir,
131            'job=s' => \$jobspec,
132            'job-api-token=s' => \$job_api_token,
133            'no-clear-tmp' => \$no_clear_tmp,
134            'resume-stash=s' => \$resume_stash,
135            'docker-bin=s' => \$docker_bin,
136     );
137
138 if (defined $job_api_token) {
139   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
140 }
141
142 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
143
144
145 $SIG{'USR1'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 1;
148 };
149 $SIG{'USR2'} = sub
150 {
151   $main::ENV{CRUNCH_DEBUG} = 0;
152 };
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $local_job;
163 if ($jobspec =~ /^[-a-z\d]+$/)
164 {
165   # $jobspec is an Arvados UUID, not a JSON job specification
166   $Job = api_call("jobs/get", uuid => $jobspec);
167   $local_job = 0;
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172   $local_job = 1;
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if ($Job->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $Job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $Job->{'started_at'} = gmtime;
216   $Job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $Job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  losing_streak => 0,
297                  hold_until => 0 };
298     foreach my $cpu (1..$ncpus)
299     {
300       push @slot, { node => $node,
301                     cpu => $cpu };
302     }
303   }
304   push @node, @nodelist;
305 }
306
307
308
309 # Ensure that we get one jobstep running on each allocated node before
310 # we start overloading nodes with concurrent steps
311
312 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
313
314
315 $Job->update_attributes(
316   'tasks_summary' => { 'failed' => 0,
317                        'todo' => 1,
318                        'running' => 0,
319                        'done' => 0 });
320
321 Log (undef, "start");
322 $SIG{'INT'} = sub { $main::please_freeze = 1; };
323 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
324 $SIG{'TERM'} = \&croak;
325 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
326 $SIG{'ALRM'} = sub { $main::please_info = 1; };
327 $SIG{'CONT'} = sub { $main::please_continue = 1; };
328 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
329
330 $main::please_freeze = 0;
331 $main::please_info = 0;
332 $main::please_continue = 0;
333 $main::please_refresh = 0;
334 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
335
336 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
337 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
338 $ENV{"JOB_UUID"} = $job_id;
339
340
341 my @jobstep_todo = ();
342 my @jobstep_done = ();
343 my @jobstep_tomerge = ();
344 my $jobstep_tomerge_level = 0;
345 my $squeue_checked = 0;
346 my $latest_refresh = scalar time;
347
348
349
350 if (defined $Job->{thawedfromkey})
351 {
352   thaw ($Job->{thawedfromkey});
353 }
354 else
355 {
356   my $first_task = api_call("job_tasks/create", job_task => {
357     'job_uuid' => $Job->{'uuid'},
358     'sequence' => 0,
359     'qsequence' => 0,
360     'parameters' => {},
361   });
362   push @jobstep, { 'level' => 0,
363                    'failures' => 0,
364                    'arvados_task' => $first_task,
365                  };
366   push @jobstep_todo, 0;
367 }
368
369
370 if (!$have_slurm)
371 {
372   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
373 }
374
375 my $build_script = handle_readall(\*DATA);
376 my $nodelist = join(",", @node);
377 my $git_tar_count = 0;
378
379 if (!defined $no_clear_tmp) {
380   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
381   Log (undef, "Clean work dirs");
382
383   my $cleanpid = fork();
384   if ($cleanpid == 0)
385   {
386     # Find FUSE mounts that look like Keep mounts (the mount path has the
387     # word "keep") and unmount them.  Then clean up work directories.
388     # TODO: When #5036 is done and widely deployed, we can get rid of the
389     # regular expression and just unmount everything with type fuse.keep.
390     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
391           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
392     exit (1);
393   }
394   while (1)
395   {
396     last if $cleanpid == waitpid (-1, WNOHANG);
397     freeze_if_want_freeze ($cleanpid);
398     select (undef, undef, undef, 0.1);
399   }
400   Log (undef, "Cleanup command exited ".exit_status_s($?));
401 }
402
403 # If this job requires a Docker image, install that.
404 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
405 if ($docker_locator = $Job->{docker_image_locator}) {
406   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
407   if (!$docker_hash)
408   {
409     croak("No Docker image hash found from locator $docker_locator");
410   }
411   $docker_stream =~ s/^\.//;
412   my $docker_install_script = qq{
413 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
414     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
415 fi
416 };
417   my $docker_pid = fork();
418   if ($docker_pid == 0)
419   {
420     srun (["srun", "--nodelist=" . join(',', @node)],
421           ["/bin/sh", "-ec", $docker_install_script]);
422     exit ($?);
423   }
424   while (1)
425   {
426     last if $docker_pid == waitpid (-1, WNOHANG);
427     freeze_if_want_freeze ($docker_pid);
428     select (undef, undef, undef, 0.1);
429   }
430   if ($? != 0)
431   {
432     croak("Installing Docker image from $docker_locator exited "
433           .exit_status_s($?));
434   }
435
436   # Determine whether this version of Docker supports memory+swap limits.
437   srun(["srun", "--nodelist=" . $node[0]],
438        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
439       {fork => 1});
440   $docker_limitmem = ($? == 0);
441
442   if ($Job->{arvados_sdk_version}) {
443     # The job also specifies an Arvados SDK version.  Add the SDKs to the
444     # tar file for the build script to install.
445     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
446                        $Job->{arvados_sdk_version}));
447     add_git_archive("git", "--git-dir=$git_dir", "archive",
448                     "--prefix=.arvados.sdk/",
449                     $Job->{arvados_sdk_version}, "sdk");
450   }
451 }
452
453 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
454   # If script_version looks like an absolute path, *and* the --git-dir
455   # argument was not given -- which implies we were not invoked by
456   # crunch-dispatch -- we will use the given path as a working
457   # directory instead of resolving script_version to a git commit (or
458   # doing anything else with git).
459   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
460   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
461 }
462 else {
463   # Resolve the given script_version to a git commit sha1. Also, if
464   # the repository is remote, clone it into our local filesystem: this
465   # ensures "git archive" will work, and is necessary to reliably
466   # resolve a symbolic script_version like "master^".
467   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
468
469   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
470
471   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
472
473   # If we're running under crunch-dispatch, it will have already
474   # pulled the appropriate source tree into its own repository, and
475   # given us that repo's path as $git_dir.
476   #
477   # If we're running a "local" job, we might have to fetch content
478   # from a remote repository.
479   #
480   # (Currently crunch-dispatch gives a local path with --git-dir, but
481   # we might as well accept URLs there too in case it changes its
482   # mind.)
483   my $repo = $git_dir || $Job->{'repository'};
484
485   # Repository can be remote or local. If remote, we'll need to fetch it
486   # to a local dir before doing `git log` et al.
487   my $repo_location;
488
489   if ($repo =~ m{://|^[^/]*:}) {
490     # $repo is a git url we can clone, like git:// or https:// or
491     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
492     # not recognized here because distinguishing that from a local
493     # path is too fragile. If you really need something strange here,
494     # use the ssh:// form.
495     $repo_location = 'remote';
496   } elsif ($repo =~ m{^\.*/}) {
497     # $repo is a local path to a git index. We'll also resolve ../foo
498     # to ../foo/.git if the latter is a directory. To help
499     # disambiguate local paths from named hosted repositories, this
500     # form must be given as ./ or ../ if it's a relative path.
501     if (-d "$repo/.git") {
502       $repo = "$repo/.git";
503     }
504     $repo_location = 'local';
505   } else {
506     # $repo is none of the above. It must be the name of a hosted
507     # repository.
508     my $arv_repo_list = api_call("repositories/list",
509                                  'filters' => [['name','=',$repo]]);
510     my @repos_found = @{$arv_repo_list->{'items'}};
511     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
512     if ($n_found > 0) {
513       Log(undef, "Repository '$repo' -> "
514           . join(", ", map { $_->{'uuid'} } @repos_found));
515     }
516     if ($n_found != 1) {
517       croak("Error: Found $n_found repositories with name '$repo'.");
518     }
519     $repo = $repos_found[0]->{'fetch_url'};
520     $repo_location = 'remote';
521   }
522   Log(undef, "Using $repo_location repository '$repo'");
523   $ENV{"CRUNCH_SRC_URL"} = $repo;
524
525   # Resolve given script_version (we'll call that $treeish here) to a
526   # commit sha1 ($commit).
527   my $treeish = $Job->{'script_version'};
528   my $commit;
529   if ($repo_location eq 'remote') {
530     # We minimize excess object-fetching by re-using the same bare
531     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
532     # just keep adding remotes to it as needed.
533     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
534     my $gitcmd = "git --git-dir=\Q$local_repo\E";
535
536     # Set up our local repo for caching remote objects, making
537     # archives, etc.
538     if (!-d $local_repo) {
539       make_path($local_repo) or croak("Error: could not create $local_repo");
540     }
541     # This works (exits 0 and doesn't delete fetched objects) even
542     # if $local_repo is already initialized:
543     `$gitcmd init --bare`;
544     if ($?) {
545       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
546     }
547
548     # If $treeish looks like a hash (or abbrev hash) we look it up in
549     # our local cache first, since that's cheaper. (We don't want to
550     # do that with tags/branches though -- those change over time, so
551     # they should always be resolved by the remote repo.)
552     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
553       # Hide stderr because it's normal for this to fail:
554       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
555       if ($? == 0 &&
556           # Careful not to resolve a branch named abcdeff to commit 1234567:
557           $sha1 =~ /^$treeish/ &&
558           $sha1 =~ /^([0-9a-f]{40})$/s) {
559         $commit = $1;
560         Log(undef, "Commit $commit already present in $local_repo");
561       }
562     }
563
564     if (!defined $commit) {
565       # If $treeish isn't just a hash or abbrev hash, or isn't here
566       # yet, we need to fetch the remote to resolve it correctly.
567
568       # First, remove all local heads. This prevents a name that does
569       # not exist on the remote from resolving to (or colliding with)
570       # a previously fetched branch or tag (possibly from a different
571       # remote).
572       remove_tree("$local_repo/refs/heads", {keep_root => 1});
573
574       Log(undef, "Fetching objects from $repo to $local_repo");
575       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
576       if ($?) {
577         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
578       }
579     }
580
581     # Now that the data is all here, we will use our local repo for
582     # the rest of our git activities.
583     $repo = $local_repo;
584   }
585
586   my $gitcmd = "git --git-dir=\Q$repo\E";
587   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
588   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
589     croak("`$gitcmd rev-list` exited "
590           .exit_status_s($?)
591           .", '$treeish' not found. Giving up.");
592   }
593   $commit = $1;
594   Log(undef, "Version $treeish is commit $commit");
595
596   if ($commit ne $Job->{'script_version'}) {
597     # Record the real commit id in the database, frozentokey, logs,
598     # etc. -- instead of an abbreviation or a branch name which can
599     # become ambiguous or point to a different commit in the future.
600     if (!$Job->update_attributes('script_version' => $commit)) {
601       croak("Error: failed to update job's script_version attribute");
602     }
603   }
604
605   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
606   add_git_archive("$gitcmd archive ''\Q$commit\E");
607 }
608
609 my $git_archive = combined_git_archive();
610 if (!defined $git_archive) {
611   Log(undef, "Skip install phase (no git archive)");
612   if ($have_slurm) {
613     Log(undef, "Warning: This probably means workers have no source tree!");
614   }
615 }
616 else {
617   my $install_exited;
618   my $install_script_tries_left = 3;
619   for (my $attempts = 0; $attempts < 3; $attempts++) {
620     Log(undef, "Run install script on all workers");
621
622     my @srunargs = ("srun",
623                     "--nodelist=$nodelist",
624                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
625     my @execargs = ("sh", "-c",
626                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
627
628     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
629     my ($install_stderr_r, $install_stderr_w);
630     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
631     set_nonblocking($install_stderr_r);
632     my $installpid = fork();
633     if ($installpid == 0)
634     {
635       close($install_stderr_r);
636       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
637       open(STDOUT, ">&", $install_stderr_w);
638       open(STDERR, ">&", $install_stderr_w);
639       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
640       exit (1);
641     }
642     close($install_stderr_w);
643     # Tell freeze_if_want_freeze how to kill the child, otherwise the
644     # "waitpid(installpid)" loop won't get interrupted by a freeze:
645     $proc{$installpid} = {};
646     my $stderr_buf = '';
647     # Track whether anything appears on stderr other than slurm errors
648     # ("srun: ...") and the "starting: ..." message printed by the
649     # srun subroutine itself:
650     my $stderr_anything_from_script = 0;
651     my $match_our_own_errors = '^(srun: error: |starting: \[)';
652     while ($installpid != waitpid(-1, WNOHANG)) {
653       freeze_if_want_freeze ($installpid);
654       # Wait up to 0.1 seconds for something to appear on stderr, then
655       # do a non-blocking read.
656       my $bits = fhbits($install_stderr_r);
657       select ($bits, undef, $bits, 0.1);
658       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
659       {
660         while ($stderr_buf =~ /^(.*?)\n/) {
661           my $line = $1;
662           substr $stderr_buf, 0, 1+length($line), "";
663           Log(undef, "stderr $line");
664           if ($line !~ /$match_our_own_errors/) {
665             $stderr_anything_from_script = 1;
666           }
667         }
668       }
669     }
670     delete $proc{$installpid};
671     $install_exited = $?;
672     close($install_stderr_r);
673     if (length($stderr_buf) > 0) {
674       if ($stderr_buf !~ /$match_our_own_errors/) {
675         $stderr_anything_from_script = 1;
676       }
677       Log(undef, "stderr $stderr_buf")
678     }
679
680     Log (undef, "Install script exited ".exit_status_s($install_exited));
681     last if $install_exited == 0 || $main::please_freeze;
682     # If the install script fails but doesn't print an error message,
683     # the next thing anyone is likely to do is just run it again in
684     # case it was a transient problem like "slurm communication fails
685     # because the network isn't reliable enough". So we'll just do
686     # that ourselves (up to 3 attempts in total). OTOH, if there is an
687     # error message, the problem is more likely to have a real fix and
688     # we should fail the job so the fixing process can start, instead
689     # of doing 2 more attempts.
690     last if $stderr_anything_from_script;
691   }
692
693   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
694     unlink($tar_filename);
695   }
696
697   if ($install_exited != 0) {
698     croak("Giving up");
699   }
700 }
701
702 foreach (qw (script script_version script_parameters runtime_constraints))
703 {
704   Log (undef,
705        "$_ " .
706        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
707 }
708 foreach (split (/\n/, $Job->{knobs}))
709 {
710   Log (undef, "knob " . $_);
711 }
712
713
714
715 $main::success = undef;
716
717
718
719 ONELEVEL:
720
721 my $thisround_succeeded = 0;
722 my $thisround_failed = 0;
723 my $thisround_failed_multiple = 0;
724
725 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
726                        or $a <=> $b } @jobstep_todo;
727 my $level = $jobstep[$jobstep_todo[0]]->{level};
728
729 my $initial_tasks_this_level = 0;
730 foreach my $id (@jobstep_todo) {
731   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
732 }
733
734 # If the number of tasks scheduled at this level #T is smaller than the number
735 # of slots available #S, only use the first #T slots, or the first slot on
736 # each node, whichever number is greater.
737 #
738 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
739 # based on these numbers.  Using fewer slots makes more resources available
740 # to each individual task, which should normally be a better strategy when
741 # there are fewer of them running with less parallelism.
742 #
743 # Note that this calculation is not redone if the initial tasks at
744 # this level queue more tasks at the same level.  This may harm
745 # overall task throughput for that level.
746 my @freeslot;
747 if ($initial_tasks_this_level < @node) {
748   @freeslot = (0..$#node);
749 } elsif ($initial_tasks_this_level < @slot) {
750   @freeslot = (0..$initial_tasks_this_level - 1);
751 } else {
752   @freeslot = (0..$#slot);
753 }
754 my $round_num_freeslots = scalar(@freeslot);
755
756 my %round_max_slots = ();
757 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
758   my $this_slot = $slot[$freeslot[$ii]];
759   my $node_name = $this_slot->{node}->{name};
760   $round_max_slots{$node_name} ||= $this_slot->{cpu};
761   last if (scalar(keys(%round_max_slots)) >= @node);
762 }
763
764 Log(undef, "start level $level with $round_num_freeslots slots");
765 my @holdslot;
766 my %reader;
767 my $progress_is_dirty = 1;
768 my $progress_stats_updated = 0;
769
770 update_progress_stats();
771
772
773 THISROUND:
774 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
775 {
776   my $id = $jobstep_todo[$todo_ptr];
777   my $Jobstep = $jobstep[$id];
778   if ($Jobstep->{level} != $level)
779   {
780     next;
781   }
782
783   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
784   set_nonblocking($reader{$id});
785
786   my $childslot = $freeslot[0];
787   my $childnode = $slot[$childslot]->{node};
788   my $childslotname = join (".",
789                             $slot[$childslot]->{node}->{name},
790                             $slot[$childslot]->{cpu});
791
792   my $childpid = fork();
793   if ($childpid == 0)
794   {
795     $SIG{'INT'} = 'DEFAULT';
796     $SIG{'QUIT'} = 'DEFAULT';
797     $SIG{'TERM'} = 'DEFAULT';
798
799     foreach (values (%reader))
800     {
801       close($_);
802     }
803     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
804     open(STDOUT,">&writer");
805     open(STDERR,">&writer");
806
807     undef $dbh;
808     undef $sth;
809
810     delete $ENV{"GNUPGHOME"};
811     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
812     $ENV{"TASK_QSEQUENCE"} = $id;
813     $ENV{"TASK_SEQUENCE"} = $level;
814     $ENV{"JOB_SCRIPT"} = $Job->{script};
815     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
816       $param =~ tr/a-z/A-Z/;
817       $ENV{"JOB_PARAMETER_$param"} = $value;
818     }
819     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
820     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
821     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
822     $ENV{"HOME"} = $ENV{"TASK_WORK"};
823     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
824     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
825     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
826     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
827
828     $ENV{"GZIP"} = "-n";
829
830     my @srunargs = (
831       "srun",
832       "--nodelist=".$childnode->{name},
833       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
834       "--job-name=$job_id.$id.$$",
835         );
836     my $command =
837         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
838         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
839         ."&& cd $ENV{CRUNCH_TMP} "
840         # These environment variables get used explicitly later in
841         # $command.  No tool is expected to read these values directly.
842         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
843         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
844         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
845         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
846     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
847     if ($docker_hash)
848     {
849       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
850       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
851       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
852       # We only set memory limits if Docker lets us limit both memory and swap.
853       # Memory limits alone have been supported longer, but subprocesses tend
854       # to get SIGKILL if they exceed that without any swap limit set.
855       # See #5642 for additional background.
856       if ($docker_limitmem) {
857         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
858       }
859
860       # Dynamically configure the container to use the host system as its
861       # DNS server.  Get the host's global addresses from the ip command,
862       # and turn them into docker --dns options using gawk.
863       $command .=
864           q{$(ip -o address show scope global |
865               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
866
867       # The source tree and $destdir directory (which we have
868       # installed on the worker host) are available in the container,
869       # under the same path.
870       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
871       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
872
873       # Currently, we make arv-mount's mount point appear at /keep
874       # inside the container (instead of using the same path as the
875       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
876       # crunch scripts and utilities must not rely on this. They must
877       # use $TASK_KEEPMOUNT.
878       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
879       $ENV{TASK_KEEPMOUNT} = "/keep";
880
881       # TASK_WORK is almost exactly like a docker data volume: it
882       # starts out empty, is writable, and persists until no
883       # containers use it any more. We don't use --volumes-from to
884       # share it with other containers: it is only accessible to this
885       # task, and it goes away when this task stops.
886       #
887       # However, a docker data volume is writable only by root unless
888       # the mount point already happens to exist in the container with
889       # different permissions. Therefore, we [1] assume /tmp already
890       # exists in the image and is writable by the crunch user; [2]
891       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
892       # writable if they are created by docker while setting up the
893       # other --volumes); and [3] create $TASK_WORK inside the
894       # container using $build_script.
895       $command .= "--volume=/tmp ";
896       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
897       $ENV{"HOME"} = $ENV{"TASK_WORK"};
898       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
899
900       # TODO: Share a single JOB_WORK volume across all task
901       # containers on a given worker node, and delete it when the job
902       # ends (and, in case that doesn't work, when the next job
903       # starts).
904       #
905       # For now, use the same approach as TASK_WORK above.
906       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
907
908       while (my ($env_key, $env_val) = each %ENV)
909       {
910         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
911           $command .= "--env=\Q$env_key=$env_val\E ";
912         }
913       }
914       $command .= "--env=\QHOME=$ENV{HOME}\E ";
915       $command .= "\Q$docker_hash\E ";
916       $command .= "stdbuf --output=0 --error=0 ";
917       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
918     } else {
919       # Non-docker run
920       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
921       $command .= "stdbuf --output=0 --error=0 ";
922       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
923     }
924
925     my @execargs = ('bash', '-c', $command);
926     srun (\@srunargs, \@execargs, undef, $build_script);
927     # exec() failed, we assume nothing happened.
928     die "srun() failed on build script\n";
929   }
930   close("writer");
931   if (!defined $childpid)
932   {
933     close $reader{$id};
934     delete $reader{$id};
935     next;
936   }
937   shift @freeslot;
938   $proc{$childpid} = { jobstep => $id,
939                        time => time,
940                        slot => $childslot,
941                        jobstepname => "$job_id.$id.$childpid",
942                      };
943   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
944   $slot[$childslot]->{pid} = $childpid;
945
946   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
947   Log ($id, "child $childpid started on $childslotname");
948   $Jobstep->{starttime} = time;
949   $Jobstep->{node} = $childnode->{name};
950   $Jobstep->{slotindex} = $childslot;
951   delete $Jobstep->{stderr};
952   delete $Jobstep->{finishtime};
953
954   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
955   $Jobstep->{'arvados_task'}->save;
956
957   splice @jobstep_todo, $todo_ptr, 1;
958   --$todo_ptr;
959
960   $progress_is_dirty = 1;
961
962   while (!@freeslot
963          ||
964          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
965   {
966     last THISROUND if $main::please_freeze || defined($main::success);
967     if ($main::please_info)
968     {
969       $main::please_info = 0;
970       freeze();
971       create_output_collection();
972       save_meta(1);
973       update_progress_stats();
974     }
975     my $gotsome
976         = readfrompipes ()
977         + reapchildren ();
978     if (!$gotsome)
979     {
980       check_refresh_wanted();
981       check_squeue();
982       update_progress_stats();
983       select (undef, undef, undef, 0.1);
984     }
985     elsif (time - $progress_stats_updated >= 30)
986     {
987       update_progress_stats();
988     }
989     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
990         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
991     {
992       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
993           .($thisround_failed+$thisround_succeeded)
994           .") -- giving up on this round";
995       Log (undef, $message);
996       last THISROUND;
997     }
998
999     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1000     for (my $i=$#freeslot; $i>=0; $i--) {
1001       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1002         push @holdslot, (splice @freeslot, $i, 1);
1003       }
1004     }
1005     for (my $i=$#holdslot; $i>=0; $i--) {
1006       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1007         push @freeslot, (splice @holdslot, $i, 1);
1008       }
1009     }
1010
1011     # give up if no nodes are succeeding
1012     if (!grep { $_->{node}->{losing_streak} == 0 &&
1013                     $_->{node}->{hold_count} < 4 } @slot) {
1014       my $message = "Every node has failed -- giving up on this round";
1015       Log (undef, $message);
1016       last THISROUND;
1017     }
1018   }
1019 }
1020
1021
1022 push @freeslot, splice @holdslot;
1023 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1024
1025
1026 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1027 while (%proc)
1028 {
1029   if ($main::please_continue) {
1030     $main::please_continue = 0;
1031     goto THISROUND;
1032   }
1033   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1034   readfrompipes ();
1035   if (!reapchildren())
1036   {
1037     check_refresh_wanted();
1038     check_squeue();
1039     update_progress_stats();
1040     select (undef, undef, undef, 0.1);
1041     killem (keys %proc) if $main::please_freeze;
1042   }
1043 }
1044
1045 update_progress_stats();
1046 freeze_if_want_freeze();
1047
1048
1049 if (!defined $main::success)
1050 {
1051   if (@jobstep_todo &&
1052       $thisround_succeeded == 0 &&
1053       ($thisround_failed == 0 || $thisround_failed > 4))
1054   {
1055     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1056     Log (undef, $message);
1057     $main::success = 0;
1058   }
1059   if (!@jobstep_todo)
1060   {
1061     $main::success = 1;
1062   }
1063 }
1064
1065 goto ONELEVEL if !defined $main::success;
1066
1067
1068 release_allocation();
1069 freeze();
1070 my $collated_output = &create_output_collection();
1071
1072 if (!$collated_output) {
1073   Log (undef, "Failed to write output collection");
1074 }
1075 else {
1076   Log(undef, "job output $collated_output");
1077   $Job->update_attributes('output' => $collated_output);
1078 }
1079
1080 Log (undef, "finish");
1081
1082 save_meta();
1083
1084 my $final_state;
1085 if ($collated_output && $main::success) {
1086   $final_state = 'Complete';
1087 } else {
1088   $final_state = 'Failed';
1089 }
1090 $Job->update_attributes('state' => $final_state);
1091
1092 exit (($final_state eq 'Complete') ? 0 : 1);
1093
1094
1095
1096 sub update_progress_stats
1097 {
1098   $progress_stats_updated = time;
1099   return if !$progress_is_dirty;
1100   my ($todo, $done, $running) = (scalar @jobstep_todo,
1101                                  scalar @jobstep_done,
1102                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1103   $Job->{'tasks_summary'} ||= {};
1104   $Job->{'tasks_summary'}->{'todo'} = $todo;
1105   $Job->{'tasks_summary'}->{'done'} = $done;
1106   $Job->{'tasks_summary'}->{'running'} = $running;
1107   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1108   Log (undef, "status: $done done, $running running, $todo todo");
1109   $progress_is_dirty = 0;
1110 }
1111
1112
1113
1114 sub reapchildren
1115 {
1116   my $pid = waitpid (-1, WNOHANG);
1117   return 0 if $pid <= 0;
1118
1119   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1120                   . "."
1121                   . $slot[$proc{$pid}->{slot}]->{cpu});
1122   my $jobstepid = $proc{$pid}->{jobstep};
1123   my $elapsed = time - $proc{$pid}->{time};
1124   my $Jobstep = $jobstep[$jobstepid];
1125
1126   my $childstatus = $?;
1127   my $exitvalue = $childstatus >> 8;
1128   my $exitinfo = "exit ".exit_status_s($childstatus);
1129   $Jobstep->{'arvados_task'}->reload;
1130   my $task_success = $Jobstep->{'arvados_task'}->{success};
1131
1132   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1133
1134   if (!defined $task_success) {
1135     # task did not indicate one way or the other --> fail
1136     $Jobstep->{'arvados_task'}->{success} = 0;
1137     $Jobstep->{'arvados_task'}->save;
1138     $task_success = 0;
1139   }
1140
1141   if (!$task_success)
1142   {
1143     my $temporary_fail;
1144     $temporary_fail ||= $Jobstep->{node_fail};
1145     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1146
1147     ++$thisround_failed;
1148     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1149
1150     # Check for signs of a failed or misconfigured node
1151     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1152         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1153       # Don't count this against jobstep failure thresholds if this
1154       # node is already suspected faulty and srun exited quickly
1155       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1156           $elapsed < 5) {
1157         Log ($jobstepid, "blaming failure on suspect node " .
1158              $slot[$proc{$pid}->{slot}]->{node}->{name});
1159         $temporary_fail ||= 1;
1160       }
1161       ban_node_by_slot($proc{$pid}->{slot});
1162     }
1163
1164     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1165                              ++$Jobstep->{'failures'},
1166                              $temporary_fail ? 'temporary' : 'permanent',
1167                              $elapsed));
1168
1169     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1170       # Give up on this task, and the whole job
1171       $main::success = 0;
1172     }
1173     # Put this task back on the todo queue
1174     push @jobstep_todo, $jobstepid;
1175     $Job->{'tasks_summary'}->{'failed'}++;
1176   }
1177   else
1178   {
1179     ++$thisround_succeeded;
1180     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1181     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1182     push @jobstep_done, $jobstepid;
1183     Log ($jobstepid, "success in $elapsed seconds");
1184   }
1185   $Jobstep->{exitcode} = $childstatus;
1186   $Jobstep->{finishtime} = time;
1187   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1188   $Jobstep->{'arvados_task'}->save;
1189   process_stderr ($jobstepid, $task_success);
1190   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1191                            length($Jobstep->{'arvados_task'}->{output}),
1192                            $Jobstep->{'arvados_task'}->{output}));
1193
1194   close $reader{$jobstepid};
1195   delete $reader{$jobstepid};
1196   delete $slot[$proc{$pid}->{slot}]->{pid};
1197   push @freeslot, $proc{$pid}->{slot};
1198   delete $proc{$pid};
1199
1200   if ($task_success) {
1201     # Load new tasks
1202     my $newtask_list = [];
1203     my $newtask_results;
1204     do {
1205       $newtask_results = api_call(
1206         "job_tasks/list",
1207         'where' => {
1208           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1209         },
1210         'order' => 'qsequence',
1211         'offset' => scalar(@$newtask_list),
1212       );
1213       push(@$newtask_list, @{$newtask_results->{items}});
1214     } while (@{$newtask_results->{items}});
1215     foreach my $arvados_task (@$newtask_list) {
1216       my $jobstep = {
1217         'level' => $arvados_task->{'sequence'},
1218         'failures' => 0,
1219         'arvados_task' => $arvados_task
1220       };
1221       push @jobstep, $jobstep;
1222       push @jobstep_todo, $#jobstep;
1223     }
1224   }
1225
1226   $progress_is_dirty = 1;
1227   1;
1228 }
1229
1230 sub check_refresh_wanted
1231 {
1232   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1233   if (@stat && $stat[9] > $latest_refresh) {
1234     $latest_refresh = scalar time;
1235     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1236     for my $attr ('cancelled_at',
1237                   'cancelled_by_user_uuid',
1238                   'cancelled_by_client_uuid',
1239                   'state') {
1240       $Job->{$attr} = $Job2->{$attr};
1241     }
1242     if ($Job->{'state'} ne "Running") {
1243       if ($Job->{'state'} eq "Cancelled") {
1244         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1245       } else {
1246         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1247       }
1248       $main::success = 0;
1249       $main::please_freeze = 1;
1250     }
1251   }
1252 }
1253
1254 sub check_squeue
1255 {
1256   my $last_squeue_check = $squeue_checked;
1257
1258   # Do not call `squeue` or check the kill list more than once every
1259   # 15 seconds.
1260   return if $last_squeue_check > time - 15;
1261   $squeue_checked = time;
1262
1263   # Look for children from which we haven't received stderr data since
1264   # the last squeue check. If no such children exist, all procs are
1265   # alive and there's no need to even look at squeue.
1266   #
1267   # As long as the crunchstat poll interval (10s) is shorter than the
1268   # squeue check interval (15s) this should make the squeue check an
1269   # infrequent event.
1270   my $silent_procs = 0;
1271   for my $jobstep (values %proc)
1272   {
1273     if ($jobstep->{stderr_at} < $last_squeue_check)
1274     {
1275       $silent_procs++;
1276     }
1277   }
1278   return if $silent_procs == 0;
1279
1280   # use killem() on procs whose killtime is reached
1281   while (my ($pid, $jobstep) = each %proc)
1282   {
1283     if (exists $jobstep->{killtime}
1284         && $jobstep->{killtime} <= time
1285         && $jobstep->{stderr_at} < $last_squeue_check)
1286     {
1287       my $sincewhen = "";
1288       if ($jobstep->{stderr_at}) {
1289         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1290       }
1291       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1292       killem ($pid);
1293     }
1294   }
1295
1296   if (!$have_slurm)
1297   {
1298     # here is an opportunity to check for mysterious problems with local procs
1299     return;
1300   }
1301
1302   # Get a list of steps still running.  Note: squeue(1) says --steps
1303   # selects a format (which we override anyway) and allows us to
1304   # specify which steps we're interested in (which we don't).
1305   # Importantly, it also changes the meaning of %j from "job name" to
1306   # "step name" and (although this isn't mentioned explicitly in the
1307   # docs) switches from "one line per job" mode to "one line per step"
1308   # mode. Without it, we'd just get a list of one job, instead of a
1309   # list of N steps.
1310   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1311   if ($? != 0)
1312   {
1313     Log(undef, "warning: squeue exit status $? ($!)");
1314     return;
1315   }
1316   chop @squeue;
1317
1318   # which of my jobsteps are running, according to squeue?
1319   my %ok;
1320   for my $jobstepname (@squeue)
1321   {
1322     $ok{$jobstepname} = 1;
1323   }
1324
1325   # Check for child procs >60s old and not mentioned by squeue.
1326   while (my ($pid, $jobstep) = each %proc)
1327   {
1328     if ($jobstep->{time} < time - 60
1329         && $jobstep->{jobstepname}
1330         && !exists $ok{$jobstep->{jobstepname}}
1331         && !exists $jobstep->{killtime})
1332     {
1333       # According to slurm, this task has ended (successfully or not)
1334       # -- but our srun child hasn't exited. First we must wait (30
1335       # seconds) in case this is just a race between communication
1336       # channels. Then, if our srun child process still hasn't
1337       # terminated, we'll conclude some slurm communication
1338       # error/delay has caused the task to die without notifying srun,
1339       # and we'll kill srun ourselves.
1340       $jobstep->{killtime} = time + 30;
1341       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1342     }
1343   }
1344 }
1345
1346
1347 sub release_allocation
1348 {
1349   if ($have_slurm)
1350   {
1351     Log (undef, "release job allocation");
1352     system "scancel $ENV{SLURM_JOB_ID}";
1353   }
1354 }
1355
1356
1357 sub readfrompipes
1358 {
1359   my $gotsome = 0;
1360   foreach my $job (keys %reader)
1361   {
1362     my $buf;
1363     while (0 < sysread ($reader{$job}, $buf, 8192))
1364     {
1365       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1366       $jobstep[$job]->{stderr_at} = time;
1367       $jobstep[$job]->{stderr} .= $buf;
1368       preprocess_stderr ($job);
1369       if (length ($jobstep[$job]->{stderr}) > 16384)
1370       {
1371         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1372       }
1373       $gotsome = 1;
1374     }
1375   }
1376   return $gotsome;
1377 }
1378
1379
1380 sub preprocess_stderr
1381 {
1382   my $job = shift;
1383
1384   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1385     my $line = $1;
1386     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1387     Log ($job, "stderr $line");
1388     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1389       # whoa.
1390       $main::please_freeze = 1;
1391     }
1392     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1393       $jobstep[$job]->{node_fail} = 1;
1394       ban_node_by_slot($jobstep[$job]->{slotindex});
1395     }
1396   }
1397 }
1398
1399
1400 sub process_stderr
1401 {
1402   my $job = shift;
1403   my $task_success = shift;
1404   preprocess_stderr ($job);
1405
1406   map {
1407     Log ($job, "stderr $_");
1408   } split ("\n", $jobstep[$job]->{stderr});
1409 }
1410
1411 sub fetch_block
1412 {
1413   my $hash = shift;
1414   my $keep;
1415   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1416     Log(undef, "fetch_block run error from arv-get $hash: $!");
1417     return undef;
1418   }
1419   my $output_block = "";
1420   while (1) {
1421     my $buf;
1422     my $bytes = sysread($keep, $buf, 1024 * 1024);
1423     if (!defined $bytes) {
1424       Log(undef, "fetch_block read error from arv-get: $!");
1425       $output_block = undef;
1426       last;
1427     } elsif ($bytes == 0) {
1428       # sysread returns 0 at the end of the pipe.
1429       last;
1430     } else {
1431       # some bytes were read into buf.
1432       $output_block .= $buf;
1433     }
1434   }
1435   close $keep;
1436   if ($?) {
1437     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1438     $output_block = undef;
1439   }
1440   return $output_block;
1441 }
1442
1443 # Create a collection by concatenating the output of all tasks (each
1444 # task's output is either a manifest fragment, a locator for a
1445 # manifest fragment stored in Keep, or nothing at all). Return the
1446 # portable_data_hash of the new collection.
1447 sub create_output_collection
1448 {
1449   Log (undef, "collate");
1450
1451   my ($child_out, $child_in);
1452   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1453 import arvados
1454 import sys
1455 print (arvados.api("v1").collections().
1456        create(body={"manifest_text": sys.stdin.read()}).
1457        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1458 }, retry_count());
1459
1460   my $task_idx = -1;
1461   my $manifest_size = 0;
1462   for (@jobstep)
1463   {
1464     ++$task_idx;
1465     my $output = $_->{'arvados_task'}->{output};
1466     next if (!defined($output));
1467     my $next_write;
1468     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1469       $next_write = fetch_block($output);
1470     } else {
1471       $next_write = $output;
1472     }
1473     if (defined($next_write)) {
1474       if (!defined(syswrite($child_in, $next_write))) {
1475         # There's been an error writing.  Stop the loop.
1476         # We'll log details about the exit code later.
1477         last;
1478       } else {
1479         $manifest_size += length($next_write);
1480       }
1481     } else {
1482       my $uuid = $_->{'arvados_task'}->{'uuid'};
1483       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1484       $main::success = 0;
1485     }
1486   }
1487   close($child_in);
1488   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1489
1490   my $joboutput;
1491   my $s = IO::Select->new($child_out);
1492   if ($s->can_read(120)) {
1493     sysread($child_out, $joboutput, 1024 * 1024);
1494     waitpid($pid, 0);
1495     if ($?) {
1496       Log(undef, "output collection creation exited " . exit_status_s($?));
1497       $joboutput = undef;
1498     } else {
1499       chomp($joboutput);
1500     }
1501   } else {
1502     Log (undef, "timed out while creating output collection");
1503     foreach my $signal (2, 2, 2, 15, 15, 9) {
1504       kill($signal, $pid);
1505       last if waitpid($pid, WNOHANG) == -1;
1506       sleep(1);
1507     }
1508   }
1509   close($child_out);
1510
1511   return $joboutput;
1512 }
1513
1514
1515 sub killem
1516 {
1517   foreach (@_)
1518   {
1519     my $sig = 2;                # SIGINT first
1520     if (exists $proc{$_}->{"sent_$sig"} &&
1521         time - $proc{$_}->{"sent_$sig"} > 4)
1522     {
1523       $sig = 15;                # SIGTERM if SIGINT doesn't work
1524     }
1525     if (exists $proc{$_}->{"sent_$sig"} &&
1526         time - $proc{$_}->{"sent_$sig"} > 4)
1527     {
1528       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1529     }
1530     if (!exists $proc{$_}->{"sent_$sig"})
1531     {
1532       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1533       kill $sig, $_;
1534       select (undef, undef, undef, 0.1);
1535       if ($sig == 2)
1536       {
1537         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1538       }
1539       $proc{$_}->{"sent_$sig"} = time;
1540       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1541     }
1542   }
1543 }
1544
1545
1546 sub fhbits
1547 {
1548   my($bits);
1549   for (@_) {
1550     vec($bits,fileno($_),1) = 1;
1551   }
1552   $bits;
1553 }
1554
1555
1556 # Send log output to Keep via arv-put.
1557 #
1558 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1559 # $log_pipe_pid is the pid of the arv-put subprocess.
1560 #
1561 # The only functions that should access these variables directly are:
1562 #
1563 # log_writer_start($logfilename)
1564 #     Starts an arv-put pipe, reading data on stdin and writing it to
1565 #     a $logfilename file in an output collection.
1566 #
1567 # log_writer_send($txt)
1568 #     Writes $txt to the output log collection.
1569 #
1570 # log_writer_finish()
1571 #     Closes the arv-put pipe and returns the output that it produces.
1572 #
1573 # log_writer_is_active()
1574 #     Returns a true value if there is currently a live arv-put
1575 #     process, false otherwise.
1576 #
1577 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1578
1579 sub log_writer_start($)
1580 {
1581   my $logfilename = shift;
1582   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1583                         'arv-put',
1584                         '--portable-data-hash',
1585                         '--project-uuid', $Job->{owner_uuid},
1586                         '--retries', '3',
1587                         '--name', $logfilename,
1588                         '--filename', $logfilename,
1589                         '-');
1590 }
1591
1592 sub log_writer_send($)
1593 {
1594   my $txt = shift;
1595   print $log_pipe_in $txt;
1596 }
1597
1598 sub log_writer_finish()
1599 {
1600   return unless $log_pipe_pid;
1601
1602   close($log_pipe_in);
1603   my $arv_put_output;
1604
1605   my $s = IO::Select->new($log_pipe_out);
1606   if ($s->can_read(120)) {
1607     sysread($log_pipe_out, $arv_put_output, 1024);
1608     chomp($arv_put_output);
1609   } else {
1610     Log (undef, "timed out reading from 'arv-put'");
1611   }
1612
1613   waitpid($log_pipe_pid, 0);
1614   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1615   if ($?) {
1616     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1617   }
1618
1619   return $arv_put_output;
1620 }
1621
1622 sub log_writer_is_active() {
1623   return $log_pipe_pid;
1624 }
1625
1626 sub Log                         # ($jobstep_id, $logmessage)
1627 {
1628   if ($_[1] =~ /\n/) {
1629     for my $line (split (/\n/, $_[1])) {
1630       Log ($_[0], $line);
1631     }
1632     return;
1633   }
1634   my $fh = select STDERR; $|=1; select $fh;
1635   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1636   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1637   $message .= "\n";
1638   my $datetime;
1639   if (log_writer_is_active() || -t STDERR) {
1640     my @gmtime = gmtime;
1641     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1642                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1643   }
1644   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1645
1646   if (log_writer_is_active()) {
1647     log_writer_send($datetime . " " . $message);
1648   }
1649 }
1650
1651
1652 sub croak
1653 {
1654   my ($package, $file, $line) = caller;
1655   my $message = "@_ at $file line $line\n";
1656   Log (undef, $message);
1657   freeze() if @jobstep_todo;
1658   create_output_collection() if @jobstep_todo;
1659   cleanup();
1660   save_meta();
1661   die;
1662 }
1663
1664
1665 sub cleanup
1666 {
1667   return unless $Job;
1668   if ($Job->{'state'} eq 'Cancelled') {
1669     $Job->update_attributes('finished_at' => scalar gmtime);
1670   } else {
1671     $Job->update_attributes('state' => 'Failed');
1672   }
1673 }
1674
1675
1676 sub save_meta
1677 {
1678   my $justcheckpoint = shift; # false if this will be the last meta saved
1679   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1680   return unless log_writer_is_active();
1681
1682   my $loglocator = log_writer_finish();
1683   Log (undef, "log manifest is $loglocator");
1684   $Job->{'log'} = $loglocator;
1685   $Job->update_attributes('log', $loglocator);
1686 }
1687
1688
1689 sub freeze_if_want_freeze
1690 {
1691   if ($main::please_freeze)
1692   {
1693     release_allocation();
1694     if (@_)
1695     {
1696       # kill some srun procs before freeze+stop
1697       map { $proc{$_} = {} } @_;
1698       while (%proc)
1699       {
1700         killem (keys %proc);
1701         select (undef, undef, undef, 0.1);
1702         my $died;
1703         while (($died = waitpid (-1, WNOHANG)) > 0)
1704         {
1705           delete $proc{$died};
1706         }
1707       }
1708     }
1709     freeze();
1710     create_output_collection();
1711     cleanup();
1712     save_meta();
1713     exit 1;
1714   }
1715 }
1716
1717
1718 sub freeze
1719 {
1720   Log (undef, "Freeze not implemented");
1721   return;
1722 }
1723
1724
1725 sub thaw
1726 {
1727   croak ("Thaw not implemented");
1728 }
1729
1730
1731 sub freezequote
1732 {
1733   my $s = shift;
1734   $s =~ s/\\/\\\\/g;
1735   $s =~ s/\n/\\n/g;
1736   return $s;
1737 }
1738
1739
1740 sub freezeunquote
1741 {
1742   my $s = shift;
1743   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1744   return $s;
1745 }
1746
1747
1748 sub srun
1749 {
1750   my $srunargs = shift;
1751   my $execargs = shift;
1752   my $opts = shift || {};
1753   my $stdin = shift;
1754   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1755
1756   $Data::Dumper::Terse = 1;
1757   $Data::Dumper::Indent = 0;
1758   my $show_cmd = Dumper($args);
1759   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1760   $show_cmd =~ s/\n/ /g;
1761   if ($opts->{fork}) {
1762     Log(undef, "starting: $show_cmd");
1763   } else {
1764     # This is a child process: parent is in charge of reading our
1765     # stderr and copying it to Log() if needed.
1766     warn "starting: $show_cmd\n";
1767   }
1768
1769   if (defined $stdin) {
1770     my $child = open STDIN, "-|";
1771     defined $child or die "no fork: $!";
1772     if ($child == 0) {
1773       print $stdin or die $!;
1774       close STDOUT or die $!;
1775       exit 0;
1776     }
1777   }
1778
1779   return system (@$args) if $opts->{fork};
1780
1781   exec @$args;
1782   warn "ENV size is ".length(join(" ",%ENV));
1783   die "exec failed: $!: @$args";
1784 }
1785
1786
1787 sub ban_node_by_slot {
1788   # Don't start any new jobsteps on this node for 60 seconds
1789   my $slotid = shift;
1790   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1791   $slot[$slotid]->{node}->{hold_count}++;
1792   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1793 }
1794
1795 sub must_lock_now
1796 {
1797   my ($lockfile, $error_message) = @_;
1798   open L, ">", $lockfile or croak("$lockfile: $!");
1799   if (!flock L, LOCK_EX|LOCK_NB) {
1800     croak("Can't lock $lockfile: $error_message\n");
1801   }
1802 }
1803
1804 sub find_docker_image {
1805   # Given a Keep locator, check to see if it contains a Docker image.
1806   # If so, return its stream name and Docker hash.
1807   # If not, return undef for both values.
1808   my $locator = shift;
1809   my ($streamname, $filename);
1810   my $image = api_call("collections/get", uuid => $locator);
1811   if ($image) {
1812     foreach my $line (split(/\n/, $image->{manifest_text})) {
1813       my @tokens = split(/\s+/, $line);
1814       next if (!@tokens);
1815       $streamname = shift(@tokens);
1816       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1817         if (defined($filename)) {
1818           return (undef, undef);  # More than one file in the Collection.
1819         } else {
1820           $filename = (split(/:/, $filedata, 3))[2];
1821         }
1822       }
1823     }
1824   }
1825   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1826     return ($streamname, $1);
1827   } else {
1828     return (undef, undef);
1829   }
1830 }
1831
1832 sub retry_count {
1833   # Calculate the number of times an operation should be retried,
1834   # assuming exponential backoff, and that we're willing to retry as
1835   # long as tasks have been running.  Enforce a minimum of 3 retries.
1836   my ($starttime, $endtime, $timediff, $retries);
1837   if (@jobstep) {
1838     $starttime = $jobstep[0]->{starttime};
1839     $endtime = $jobstep[-1]->{finishtime};
1840   }
1841   if (!defined($starttime)) {
1842     $timediff = 0;
1843   } elsif (!defined($endtime)) {
1844     $timediff = time - $starttime;
1845   } else {
1846     $timediff = ($endtime - $starttime) - (time - $endtime);
1847   }
1848   if ($timediff > 0) {
1849     $retries = int(log($timediff) / log(2));
1850   } else {
1851     $retries = 1;  # Use the minimum.
1852   }
1853   return ($retries > 3) ? $retries : 3;
1854 }
1855
1856 sub retry_op {
1857   # Pass in two function references.
1858   # This method will be called with the remaining arguments.
1859   # If it dies, retry it with exponential backoff until it succeeds,
1860   # or until the current retry_count is exhausted.  After each failure
1861   # that can be retried, the second function will be called with
1862   # the current try count (0-based), next try time, and error message.
1863   my $operation = shift;
1864   my $retry_callback = shift;
1865   my $retries = retry_count();
1866   foreach my $try_count (0..$retries) {
1867     my $next_try = time + (2 ** $try_count);
1868     my $result = eval { $operation->(@_); };
1869     if (!$@) {
1870       return $result;
1871     } elsif ($try_count < $retries) {
1872       $retry_callback->($try_count, $next_try, $@);
1873       my $sleep_time = $next_try - time;
1874       sleep($sleep_time) if ($sleep_time > 0);
1875     }
1876   }
1877   # Ensure the error message ends in a newline, so Perl doesn't add
1878   # retry_op's line number to it.
1879   chomp($@);
1880   die($@ . "\n");
1881 }
1882
1883 sub api_call {
1884   # Pass in a /-separated API method name, and arguments for it.
1885   # This function will call that method, retrying as needed until
1886   # the current retry_count is exhausted, with a log on the first failure.
1887   my $method_name = shift;
1888   my $log_api_retry = sub {
1889     my ($try_count, $next_try_at, $errmsg) = @_;
1890     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1891     $errmsg =~ s/\s/ /g;
1892     $errmsg =~ s/\s+$//;
1893     my $retry_msg;
1894     if ($next_try_at < time) {
1895       $retry_msg = "Retrying.";
1896     } else {
1897       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1898       $retry_msg = "Retrying at $next_try_fmt.";
1899     }
1900     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1901   };
1902   my $method = $arv;
1903   foreach my $key (split(/\//, $method_name)) {
1904     $method = $method->{$key};
1905   }
1906   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1907 }
1908
1909 sub exit_status_s {
1910   # Given a $?, return a human-readable exit code string like "0" or
1911   # "1" or "0 with signal 1" or "1 with signal 11".
1912   my $exitcode = shift;
1913   my $s = $exitcode >> 8;
1914   if ($exitcode & 0x7f) {
1915     $s .= " with signal " . ($exitcode & 0x7f);
1916   }
1917   if ($exitcode & 0x80) {
1918     $s .= " with core dump";
1919   }
1920   return $s;
1921 }
1922
1923 sub handle_readall {
1924   # Pass in a glob reference to a file handle.
1925   # Read all its contents and return them as a string.
1926   my $fh_glob_ref = shift;
1927   local $/ = undef;
1928   return <$fh_glob_ref>;
1929 }
1930
1931 sub tar_filename_n {
1932   my $n = shift;
1933   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1934 }
1935
1936 sub add_git_archive {
1937   # Pass in a git archive command as a string or list, a la system().
1938   # This method will save its output to be included in the archive sent to the
1939   # build script.
1940   my $git_input;
1941   $git_tar_count++;
1942   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1943     croak("Failed to save git archive: $!");
1944   }
1945   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1946   close($git_input);
1947   waitpid($git_pid, 0);
1948   close(GIT_ARCHIVE);
1949   if ($?) {
1950     croak("Failed to save git archive: git exited " . exit_status_s($?));
1951   }
1952 }
1953
1954 sub combined_git_archive {
1955   # Combine all saved tar archives into a single archive, then return its
1956   # contents in a string.  Return undef if no archives have been saved.
1957   if ($git_tar_count < 1) {
1958     return undef;
1959   }
1960   my $base_tar_name = tar_filename_n(1);
1961   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1962     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1963     if ($tar_exit != 0) {
1964       croak("Error preparing build archive: tar -A exited " .
1965             exit_status_s($tar_exit));
1966     }
1967   }
1968   if (!open(GIT_TAR, "<", $base_tar_name)) {
1969     croak("Could not open build archive: $!");
1970   }
1971   my $tar_contents = handle_readall(\*GIT_TAR);
1972   close(GIT_TAR);
1973   return $tar_contents;
1974 }
1975
1976 sub set_nonblocking {
1977   my $fh = shift;
1978   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1979   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1980 }
1981
1982 __DATA__
1983 #!/usr/bin/perl
1984 #
1985 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1986 # server invokes this script on individual compute nodes, or localhost if we're
1987 # running a job locally.  It gets called in two modes:
1988 #
1989 # * No arguments: Installation mode.  Read a tar archive from the DATA
1990 #   file handle; it includes the Crunch script's source code, and
1991 #   maybe SDKs as well.  Those should be installed in the proper
1992 #   locations.  This runs outside of any Docker container, so don't try to
1993 #   introspect Crunch's runtime environment.
1994 #
1995 # * With arguments: Crunch script run mode.  This script should set up the
1996 #   environment, then run the command specified in the arguments.  This runs
1997 #   inside any Docker container.
1998
1999 use Fcntl ':flock';
2000 use File::Path qw( make_path remove_tree );
2001 use POSIX qw(getcwd);
2002
2003 use constant TASK_TEMPFAIL => 111;
2004
2005 # Map SDK subdirectories to the path environments they belong to.
2006 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2007
2008 my $destdir = $ENV{"CRUNCH_SRC"};
2009 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2010 my $repo = $ENV{"CRUNCH_SRC_URL"};
2011 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2012 my $job_work = $ENV{"JOB_WORK"};
2013 my $task_work = $ENV{"TASK_WORK"};
2014
2015 open(STDOUT_ORIG, ">&", STDOUT);
2016 open(STDERR_ORIG, ">&", STDERR);
2017
2018 for my $dir ($destdir, $job_work, $task_work) {
2019   if ($dir) {
2020     make_path $dir;
2021     -e $dir or die "Failed to create temporary directory ($dir): $!";
2022   }
2023 }
2024
2025 if ($task_work) {
2026   remove_tree($task_work, {keep_root => 1});
2027 }
2028
2029 ### Crunch script run mode
2030 if (@ARGV) {
2031   # We want to do routine logging during task 0 only.  This gives the user
2032   # the information they need, but avoids repeating the information for every
2033   # task.
2034   my $Log;
2035   if ($ENV{TASK_SEQUENCE} eq "0") {
2036     $Log = sub {
2037       my $msg = shift;
2038       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2039     };
2040   } else {
2041     $Log = sub { };
2042   }
2043
2044   my $python_src = "$install_dir/python";
2045   my $venv_dir = "$job_work/.arvados.venv";
2046   my $venv_built = -e "$venv_dir/bin/activate";
2047   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2048     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2049                  "--python=python2.7", $venv_dir);
2050     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2051     $venv_built = 1;
2052     $Log->("Built Python SDK virtualenv");
2053   }
2054
2055   my $pip_bin = "pip";
2056   if ($venv_built) {
2057     $Log->("Running in Python SDK virtualenv");
2058     $pip_bin = "$venv_dir/bin/pip";
2059     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2060     @ARGV = ("/bin/sh", "-ec",
2061              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2062   } elsif (-d $python_src) {
2063     $Log->("Warning: virtualenv not found inside Docker container default " .
2064            "\$PATH. Can't install Python SDK.");
2065   }
2066
2067   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2068   if ($pkgs) {
2069     $Log->("Using Arvados SDK:");
2070     foreach my $line (split /\n/, $pkgs) {
2071       $Log->($line);
2072     }
2073   } else {
2074     $Log->("Arvados SDK packages not found");
2075   }
2076
2077   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2078     my $sdk_path = "$install_dir/$sdk_dir";
2079     if (-d $sdk_path) {
2080       if ($ENV{$sdk_envkey}) {
2081         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2082       } else {
2083         $ENV{$sdk_envkey} = $sdk_path;
2084       }
2085       $Log->("Arvados SDK added to %s", $sdk_envkey);
2086     }
2087   }
2088
2089   exec(@ARGV);
2090   die "Cannot exec `@ARGV`: $!";
2091 }
2092
2093 ### Installation mode
2094 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2095 flock L, LOCK_EX;
2096 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2097   # This exact git archive (source + arvados sdk) is already installed
2098   # here, so there's no need to reinstall it.
2099
2100   # We must consume our DATA section, though: otherwise the process
2101   # feeding it to us will get SIGPIPE.
2102   my $buf;
2103   while (read(DATA, $buf, 65536)) { }
2104
2105   exit(0);
2106 }
2107
2108 unlink "$destdir.archive_hash";
2109 mkdir $destdir;
2110
2111 do {
2112   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2113   local $SIG{PIPE} = "IGNORE";
2114   warn "Extracting archive: $archive_hash\n";
2115   # --ignore-zeros is necessary sometimes: depending on how much NUL
2116   # padding tar -A put on our combined archive (which in turn depends
2117   # on the length of the component archives) tar without
2118   # --ignore-zeros will exit before consuming stdin and cause close()
2119   # to fail on the resulting SIGPIPE.
2120   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2121     die "Error launching 'tar -xC $destdir': $!";
2122   }
2123   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2124   # get SIGPIPE.  We must feed it data incrementally.
2125   my $tar_input;
2126   while (read(DATA, $tar_input, 65536)) {
2127     print TARX $tar_input;
2128   }
2129   if(!close(TARX)) {
2130     die "'tar -xC $destdir' exited $?: $!";
2131   }
2132 };
2133
2134 mkdir $install_dir;
2135
2136 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2137 if (-d $sdk_root) {
2138   foreach my $sdk_lang (("python",
2139                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2140     if (-d "$sdk_root/$sdk_lang") {
2141       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2142         die "Failed to install $sdk_lang SDK: $!";
2143       }
2144     }
2145   }
2146 }
2147
2148 my $python_dir = "$install_dir/python";
2149 if ((-d $python_dir) and can_run("python2.7") and
2150     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2151   # egg_info failed, probably when it asked git for a build tag.
2152   # Specify no build tag.
2153   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2154   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2155   close($pysdk_cfg);
2156 }
2157
2158 # Hide messages from the install script (unless it fails: shell_or_die
2159 # will show $destdir.log in that case).
2160 open(STDOUT, ">>", "$destdir.log");
2161 open(STDERR, ">&", STDOUT);
2162
2163 if (-e "$destdir/crunch_scripts/install") {
2164     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2165 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2166     # Old version
2167     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2168 } elsif (-e "./install.sh") {
2169     shell_or_die (undef, "./install.sh", $install_dir);
2170 }
2171
2172 if ($archive_hash) {
2173     unlink "$destdir.archive_hash.new";
2174     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2175     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2176 }
2177
2178 close L;
2179
2180 sub can_run {
2181   my $command_name = shift;
2182   open(my $which, "-|", "which", $command_name);
2183   while (<$which>) { }
2184   close($which);
2185   return ($? == 0);
2186 }
2187
2188 sub shell_or_die
2189 {
2190   my $exitcode = shift;
2191
2192   if ($ENV{"DEBUG"}) {
2193     print STDERR "@_\n";
2194   }
2195   if (system (@_) != 0) {
2196     my $err = $!;
2197     my $code = $?;
2198     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2199     open STDERR, ">&STDERR_ORIG";
2200     system ("cat $destdir.log >&2");
2201     warn "@_ failed ($err): $exitstatus";
2202     if (defined($exitcode)) {
2203       exit $exitcode;
2204     }
2205     else {
2206       exit (($code >> 8) || 1);
2207     }
2208   }
2209 }
2210
2211 __DATA__