6146: Better log message.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my %proc;
122 my $force_unlock;
123 my $git_dir;
124 my $jobspec;
125 my $job_api_token;
126 my $no_clear_tmp;
127 my $resume_stash;
128 my $docker_bin = "/usr/bin/docker.io";
129 GetOptions('force-unlock' => \$force_unlock,
130            'git-dir=s' => \$git_dir,
131            'job=s' => \$jobspec,
132            'job-api-token=s' => \$job_api_token,
133            'no-clear-tmp' => \$no_clear_tmp,
134            'resume-stash=s' => \$resume_stash,
135            'docker-bin=s' => \$docker_bin,
136     );
137
138 if (defined $job_api_token) {
139   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
140 }
141
142 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
143
144
145 $SIG{'USR1'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 1;
148 };
149 $SIG{'USR2'} = sub
150 {
151   $main::ENV{CRUNCH_DEBUG} = 0;
152 };
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $local_job;
163 if ($jobspec =~ /^[-a-z\d]+$/)
164 {
165   # $jobspec is an Arvados UUID, not a JSON job specification
166   $Job = api_call("jobs/get", uuid => $jobspec);
167   $local_job = 0;
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172   $local_job = 1;
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if ($Job->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $Job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $Job->{'started_at'} = gmtime;
216   $Job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $Job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  losing_streak => 0,
297                  hold_until => 0 };
298     foreach my $cpu (1..$ncpus)
299     {
300       push @slot, { node => $node,
301                     cpu => $cpu };
302     }
303   }
304   push @node, @nodelist;
305 }
306
307
308
309 # Ensure that we get one jobstep running on each allocated node before
310 # we start overloading nodes with concurrent steps
311
312 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
313
314
315 $Job->update_attributes(
316   'tasks_summary' => { 'failed' => 0,
317                        'todo' => 1,
318                        'running' => 0,
319                        'done' => 0 });
320
321 Log (undef, "start");
322 $SIG{'INT'} = sub { $main::please_freeze = 1; };
323 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
324 $SIG{'TERM'} = \&croak;
325 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
326 $SIG{'ALRM'} = sub { $main::please_info = 1; };
327 $SIG{'CONT'} = sub { $main::please_continue = 1; };
328 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
329
330 $main::please_freeze = 0;
331 $main::please_info = 0;
332 $main::please_continue = 0;
333 $main::please_refresh = 0;
334 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
335
336 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
337 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
338 $ENV{"JOB_UUID"} = $job_id;
339
340
341 my @jobstep_todo = ();
342 my @jobstep_done = ();
343 my @jobstep_tomerge = ();
344 my $jobstep_tomerge_level = 0;
345 my $squeue_checked = 0;
346 my $latest_refresh = scalar time;
347
348
349
350 if (defined $Job->{thawedfromkey})
351 {
352   thaw ($Job->{thawedfromkey});
353 }
354 else
355 {
356   my $first_task = api_call("job_tasks/create", job_task => {
357     'job_uuid' => $Job->{'uuid'},
358     'sequence' => 0,
359     'qsequence' => 0,
360     'parameters' => {},
361   });
362   push @jobstep, { 'level' => 0,
363                    'failures' => 0,
364                    'arvados_task' => $first_task,
365                  };
366   push @jobstep_todo, 0;
367 }
368
369
370 if (!$have_slurm)
371 {
372   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
373 }
374
375 my $build_script = handle_readall(\*DATA);
376 my $nodelist = join(",", @node);
377 my $git_tar_count = 0;
378
379 if (!defined $no_clear_tmp) {
380   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
381   Log (undef, "Clean work dirs");
382
383   my $cleanpid = fork();
384   if ($cleanpid == 0)
385   {
386     # Find FUSE mounts that look like Keep mounts (the mount path has the
387     # word "keep") and unmount them.  Then clean up work directories.
388     # TODO: When #5036 is done and widely deployed, we can get rid of the
389     # regular expression and just unmount everything with type fuse.keep.
390     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
391           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
392     exit (1);
393   }
394   while (1)
395   {
396     last if $cleanpid == waitpid (-1, WNOHANG);
397     freeze_if_want_freeze ($cleanpid);
398     select (undef, undef, undef, 0.1);
399   }
400   Log (undef, "Cleanup command exited ".exit_status_s($?));
401 }
402
403 # If this job requires a Docker image, install that.
404 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
405 if ($docker_locator = $Job->{docker_image_locator}) {
406   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
407   if (!$docker_hash)
408   {
409     croak("No Docker image hash found from locator $docker_locator");
410   }
411   $docker_stream =~ s/^\.//;
412   my $docker_install_script = qq{
413 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
414     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
415 fi
416 };
417   my $docker_pid = fork();
418   if ($docker_pid == 0)
419   {
420     srun (["srun", "--nodelist=" . join(',', @node)],
421           ["/bin/sh", "-ec", $docker_install_script]);
422     exit ($?);
423   }
424   while (1)
425   {
426     last if $docker_pid == waitpid (-1, WNOHANG);
427     freeze_if_want_freeze ($docker_pid);
428     select (undef, undef, undef, 0.1);
429   }
430   if ($? != 0)
431   {
432     croak("Installing Docker image from $docker_locator exited "
433           .exit_status_s($?));
434   }
435
436   # Determine whether this version of Docker supports memory+swap limits.
437   srun(["srun", "--nodelist=" . $node[0]],
438        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
439       {fork => 1});
440   $docker_limitmem = ($? == 0);
441
442   if ($Job->{arvados_sdk_version}) {
443     # The job also specifies an Arvados SDK version.  Add the SDKs to the
444     # tar file for the build script to install.
445     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
446                        $Job->{arvados_sdk_version}));
447     add_git_archive("git", "--git-dir=$git_dir", "archive",
448                     "--prefix=.arvados.sdk/",
449                     $Job->{arvados_sdk_version}, "sdk");
450   }
451 }
452
453 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
454   # If script_version looks like an absolute path, *and* the --git-dir
455   # argument was not given -- which implies we were not invoked by
456   # crunch-dispatch -- we will use the given path as a working
457   # directory instead of resolving script_version to a git commit (or
458   # doing anything else with git).
459   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
460   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
461 }
462 else {
463   # Resolve the given script_version to a git commit sha1. Also, if
464   # the repository is remote, clone it into our local filesystem: this
465   # ensures "git archive" will work, and is necessary to reliably
466   # resolve a symbolic script_version like "master^".
467   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
468
469   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
470
471   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
472
473   # If we're running under crunch-dispatch, it will have already
474   # pulled the appropriate source tree into its own repository, and
475   # given us that repo's path as $git_dir.
476   #
477   # If we're running a "local" job, we might have to fetch content
478   # from a remote repository.
479   #
480   # (Currently crunch-dispatch gives a local path with --git-dir, but
481   # we might as well accept URLs there too in case it changes its
482   # mind.)
483   my $repo = $git_dir || $Job->{'repository'};
484
485   # Repository can be remote or local. If remote, we'll need to fetch it
486   # to a local dir before doing `git log` et al.
487   my $repo_location;
488
489   if ($repo =~ m{://|^[^/]*:}) {
490     # $repo is a git url we can clone, like git:// or https:// or
491     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
492     # not recognized here because distinguishing that from a local
493     # path is too fragile. If you really need something strange here,
494     # use the ssh:// form.
495     $repo_location = 'remote';
496   } elsif ($repo =~ m{^\.*/}) {
497     # $repo is a local path to a git index. We'll also resolve ../foo
498     # to ../foo/.git if the latter is a directory. To help
499     # disambiguate local paths from named hosted repositories, this
500     # form must be given as ./ or ../ if it's a relative path.
501     if (-d "$repo/.git") {
502       $repo = "$repo/.git";
503     }
504     $repo_location = 'local';
505   } else {
506     # $repo is none of the above. It must be the name of a hosted
507     # repository.
508     my $arv_repo_list = api_call("repositories/list",
509                                  'filters' => [['name','=',$repo]]);
510     my @repos_found = @{$arv_repo_list->{'items'}};
511     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
512     if ($n_found > 0) {
513       Log(undef, "Repository '$repo' -> "
514           . join(", ", map { $_->{'uuid'} } @repos_found));
515     }
516     if ($n_found != 1) {
517       croak("Error: Found $n_found repositories with name '$repo'.");
518     }
519     $repo = $repos_found[0]->{'fetch_url'};
520     $repo_location = 'remote';
521   }
522   Log(undef, "Using $repo_location repository '$repo'");
523   $ENV{"CRUNCH_SRC_URL"} = $repo;
524
525   # Resolve given script_version (we'll call that $treeish here) to a
526   # commit sha1 ($commit).
527   my $treeish = $Job->{'script_version'};
528   my $commit;
529   if ($repo_location eq 'remote') {
530     # We minimize excess object-fetching by re-using the same bare
531     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
532     # just keep adding remotes to it as needed.
533     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
534     my $gitcmd = "git --git-dir=\Q$local_repo\E";
535
536     # Set up our local repo for caching remote objects, making
537     # archives, etc.
538     if (!-d $local_repo) {
539       make_path($local_repo) or croak("Error: could not create $local_repo");
540     }
541     # This works (exits 0 and doesn't delete fetched objects) even
542     # if $local_repo is already initialized:
543     `$gitcmd init --bare`;
544     if ($?) {
545       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
546     }
547
548     # If $treeish looks like a hash (or abbrev hash) we look it up in
549     # our local cache first, since that's cheaper. (We don't want to
550     # do that with tags/branches though -- those change over time, so
551     # they should always be resolved by the remote repo.)
552     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
553       # Hide stderr because it's normal for this to fail:
554       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
555       if ($? == 0 &&
556           # Careful not to resolve a branch named abcdeff to commit 1234567:
557           $sha1 =~ /^$treeish/ &&
558           $sha1 =~ /^([0-9a-f]{40})$/s) {
559         $commit = $1;
560         Log(undef, "Commit $commit already present in $local_repo");
561       }
562     }
563
564     if (!defined $commit) {
565       # If $treeish isn't just a hash or abbrev hash, or isn't here
566       # yet, we need to fetch the remote to resolve it correctly.
567
568       # First, remove all local heads. This prevents a name that does
569       # not exist on the remote from resolving to (or colliding with)
570       # a previously fetched branch or tag (possibly from a different
571       # remote).
572       remove_tree("$local_repo/refs/heads", {keep_root => 1});
573
574       Log(undef, "Fetching objects from $repo to $local_repo");
575       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
576       if ($?) {
577         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
578       }
579     }
580
581     # Now that the data is all here, we will use our local repo for
582     # the rest of our git activities.
583     $repo = $local_repo;
584   }
585
586   my $gitcmd = "git --git-dir=\Q$repo\E";
587   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
588   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
589     croak("`$gitcmd rev-list` exited "
590           .exit_status_s($?)
591           .", '$treeish' not found. Giving up.");
592   }
593   $commit = $1;
594   Log(undef, "Version $treeish is commit $commit");
595
596   if ($commit ne $Job->{'script_version'}) {
597     # Record the real commit id in the database, frozentokey, logs,
598     # etc. -- instead of an abbreviation or a branch name which can
599     # become ambiguous or point to a different commit in the future.
600     if (!$Job->update_attributes('script_version' => $commit)) {
601       croak("Error: failed to update job's script_version attribute");
602     }
603   }
604
605   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
606   add_git_archive("$gitcmd archive ''\Q$commit\E");
607 }
608
609 my $git_archive = combined_git_archive();
610 if (!defined $git_archive) {
611   Log(undef, "Skip install phase (no git archive)");
612   if ($have_slurm) {
613     Log(undef, "Warning: This probably means workers have no source tree!");
614   }
615 }
616 else {
617   my $install_exited;
618   my $install_script_tries_left = 3;
619   for (my $attempts = 0; $attempts < 3; $attempts++) {
620     Log(undef, "Run install script on all workers");
621
622     my @srunargs = ("srun",
623                     "--nodelist=$nodelist",
624                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
625     my @execargs = ("sh", "-c",
626                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
627
628     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
629     my ($install_stderr_r, $install_stderr_w);
630     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
631     set_nonblocking($install_stderr_r);
632     my $installpid = fork();
633     if ($installpid == 0)
634     {
635       close($install_stderr_r);
636       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
637       open(STDOUT, ">&", $install_stderr_w);
638       open(STDERR, ">&", $install_stderr_w);
639       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
640       exit (1);
641     }
642     close($install_stderr_w);
643     # Tell freeze_if_want_freeze how to kill the child, otherwise the
644     # "waitpid(installpid)" loop won't get interrupted by a freeze:
645     $proc{$installpid} = {};
646     my $stderr_buf = '';
647     # Track whether anything appears on stderr other than slurm errors
648     # ("srun: ...") and the "starting: ..." message printed by the
649     # srun subroutine itself:
650     my $stderr_anything_from_script = 0;
651     my $match_our_own_errors = '^(srun: error: |starting: \[)';
652     while ($installpid != waitpid(-1, WNOHANG)) {
653       freeze_if_want_freeze ($installpid);
654       # Wait up to 0.1 seconds for something to appear on stderr, then
655       # do a non-blocking read.
656       my $bits = fhbits($install_stderr_r);
657       select ($bits, undef, $bits, 0.1);
658       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
659       {
660         while ($stderr_buf =~ /^(.*?)\n/) {
661           my $line = $1;
662           substr $stderr_buf, 0, 1+length($line), "";
663           Log(undef, "stderr $line");
664           if ($line !~ /$match_our_own_errors/) {
665             $stderr_anything_from_script = 1;
666           }
667         }
668       }
669     }
670     delete $proc{$installpid};
671     $install_exited = $?;
672     close($install_stderr_r);
673     if (length($stderr_buf) > 0) {
674       if ($stderr_buf !~ /$match_our_own_errors/) {
675         $stderr_anything_from_script = 1;
676       }
677       Log(undef, "stderr $stderr_buf")
678     }
679
680     Log (undef, "Install script exited ".exit_status_s($install_exited));
681     last if $install_exited == 0 || $main::please_freeze;
682     # If the install script fails but doesn't print an error message,
683     # the next thing anyone is likely to do is just run it again in
684     # case it was a transient problem like "slurm communication fails
685     # because the network isn't reliable enough". So we'll just do
686     # that ourselves (up to 3 attempts in total). OTOH, if there is an
687     # error message, the problem is more likely to have a real fix and
688     # we should fail the job so the fixing process can start, instead
689     # of doing 2 more attempts.
690     last if $stderr_anything_from_script;
691   }
692
693   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
694     unlink($tar_filename);
695   }
696
697   if ($install_exited != 0) {
698     croak("Giving up");
699   }
700 }
701
702 foreach (qw (script script_version script_parameters runtime_constraints))
703 {
704   Log (undef,
705        "$_ " .
706        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
707 }
708 foreach (split (/\n/, $Job->{knobs}))
709 {
710   Log (undef, "knob " . $_);
711 }
712
713
714
715 $main::success = undef;
716
717
718
719 ONELEVEL:
720
721 my $thisround_succeeded = 0;
722 my $thisround_failed = 0;
723 my $thisround_failed_multiple = 0;
724
725 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
726                        or $a <=> $b } @jobstep_todo;
727 my $level = $jobstep[$jobstep_todo[0]]->{level};
728
729 my $initial_tasks_this_level = 0;
730 foreach my $id (@jobstep_todo) {
731   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
732 }
733
734 # If the number of tasks scheduled at this level #T is smaller than the number
735 # of slots available #S, only use the first #T slots, or the first slot on
736 # each node, whichever number is greater.
737 #
738 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
739 # based on these numbers.  Using fewer slots makes more resources available
740 # to each individual task, which should normally be a better strategy when
741 # there are fewer of them running with less parallelism.
742 #
743 # Note that this calculation is not redone if the initial tasks at
744 # this level queue more tasks at the same level.  This may harm
745 # overall task throughput for that level.
746 my @freeslot;
747 if ($initial_tasks_this_level < @node) {
748   @freeslot = (0..$#node);
749 } elsif ($initial_tasks_this_level < @slot) {
750   @freeslot = (0..$initial_tasks_this_level - 1);
751 } else {
752   @freeslot = (0..$#slot);
753 }
754 my $round_num_freeslots = scalar(@freeslot);
755
756 my %round_max_slots = ();
757 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
758   my $this_slot = $slot[$freeslot[$ii]];
759   my $node_name = $this_slot->{node}->{name};
760   $round_max_slots{$node_name} ||= $this_slot->{cpu};
761   last if (scalar(keys(%round_max_slots)) >= @node);
762 }
763
764 Log(undef, "start level $level with $round_num_freeslots slots");
765 my @holdslot;
766 my %reader;
767 my $progress_is_dirty = 1;
768 my $progress_stats_updated = 0;
769
770 update_progress_stats();
771
772
773 THISROUND:
774 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
775 {
776   my $id = $jobstep_todo[$todo_ptr];
777   my $Jobstep = $jobstep[$id];
778   if ($Jobstep->{level} != $level)
779   {
780     next;
781   }
782
783   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
784   set_nonblocking($reader{$id});
785
786   my $childslot = $freeslot[0];
787   my $childnode = $slot[$childslot]->{node};
788   my $childslotname = join (".",
789                             $slot[$childslot]->{node}->{name},
790                             $slot[$childslot]->{cpu});
791
792   my $childpid = fork();
793   if ($childpid == 0)
794   {
795     $SIG{'INT'} = 'DEFAULT';
796     $SIG{'QUIT'} = 'DEFAULT';
797     $SIG{'TERM'} = 'DEFAULT';
798
799     foreach (values (%reader))
800     {
801       close($_);
802     }
803     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
804     open(STDOUT,">&writer");
805     open(STDERR,">&writer");
806
807     undef $dbh;
808     undef $sth;
809
810     delete $ENV{"GNUPGHOME"};
811     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
812     $ENV{"TASK_QSEQUENCE"} = $id;
813     $ENV{"TASK_SEQUENCE"} = $level;
814     $ENV{"JOB_SCRIPT"} = $Job->{script};
815     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
816       $param =~ tr/a-z/A-Z/;
817       $ENV{"JOB_PARAMETER_$param"} = $value;
818     }
819     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
820     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
821     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
822     $ENV{"HOME"} = $ENV{"TASK_WORK"};
823     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
824     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
825     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
826     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
827
828     $ENV{"GZIP"} = "-n";
829
830     my @srunargs = (
831       "srun",
832       "--nodelist=".$childnode->{name},
833       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
834       "--job-name=$job_id.$id.$$",
835         );
836     my $command =
837         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
838         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
839         ."&& cd $ENV{CRUNCH_TMP} "
840         # These environment variables get used explicitly later in
841         # $command.  No tool is expected to read these values directly.
842         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
843         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
844         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
845         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
846     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
847     if ($docker_hash)
848     {
849       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
850       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
851       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
852       # We only set memory limits if Docker lets us limit both memory and swap.
853       # Memory limits alone have been supported longer, but subprocesses tend
854       # to get SIGKILL if they exceed that without any swap limit set.
855       # See #5642 for additional background.
856       if ($docker_limitmem) {
857         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
858       }
859
860       # Dynamically configure the container to use the host system as its
861       # DNS server.  Get the host's global addresses from the ip command,
862       # and turn them into docker --dns options using gawk.
863       $command .=
864           q{$(ip -o address show scope global |
865               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
866
867       # The source tree and $destdir directory (which we have
868       # installed on the worker host) are available in the container,
869       # under the same path.
870       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
871       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
872
873       # Currently, we make arv-mount's mount point appear at /keep
874       # inside the container (instead of using the same path as the
875       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
876       # crunch scripts and utilities must not rely on this. They must
877       # use $TASK_KEEPMOUNT.
878       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
879       $ENV{TASK_KEEPMOUNT} = "/keep";
880
881       # TASK_WORK is almost exactly like a docker data volume: it
882       # starts out empty, is writable, and persists until no
883       # containers use it any more. We don't use --volumes-from to
884       # share it with other containers: it is only accessible to this
885       # task, and it goes away when this task stops.
886       #
887       # However, a docker data volume is writable only by root unless
888       # the mount point already happens to exist in the container with
889       # different permissions. Therefore, we [1] assume /tmp already
890       # exists in the image and is writable by the crunch user; [2]
891       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
892       # writable if they are created by docker while setting up the
893       # other --volumes); and [3] create $TASK_WORK inside the
894       # container using $build_script.
895       $command .= "--volume=/tmp ";
896       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
897       $ENV{"HOME"} = $ENV{"TASK_WORK"};
898       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
899
900       # TODO: Share a single JOB_WORK volume across all task
901       # containers on a given worker node, and delete it when the job
902       # ends (and, in case that doesn't work, when the next job
903       # starts).
904       #
905       # For now, use the same approach as TASK_WORK above.
906       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
907
908       while (my ($env_key, $env_val) = each %ENV)
909       {
910         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
911           $command .= "--env=\Q$env_key=$env_val\E ";
912         }
913       }
914       $command .= "--env=\QHOME=$ENV{HOME}\E ";
915       $command .= "\Q$docker_hash\E ";
916       $command .= "stdbuf --output=0 --error=0 ";
917       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
918     } else {
919       # Non-docker run
920       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
921       $command .= "stdbuf --output=0 --error=0 ";
922       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
923     }
924
925     my @execargs = ('bash', '-c', $command);
926     srun (\@srunargs, \@execargs, undef, $build_script);
927     # exec() failed, we assume nothing happened.
928     die "srun() failed on build script\n";
929   }
930   close("writer");
931   if (!defined $childpid)
932   {
933     close $reader{$id};
934     delete $reader{$id};
935     next;
936   }
937   shift @freeslot;
938   $proc{$childpid} = { jobstep => $id,
939                        time => time,
940                        slot => $childslot,
941                        jobstepname => "$job_id.$id.$childpid",
942                      };
943   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
944   $slot[$childslot]->{pid} = $childpid;
945
946   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
947   Log ($id, "child $childpid started on $childslotname");
948   $Jobstep->{starttime} = time;
949   $Jobstep->{node} = $childnode->{name};
950   $Jobstep->{slotindex} = $childslot;
951   delete $Jobstep->{stderr};
952   delete $Jobstep->{finishtime};
953
954   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
955   $Jobstep->{'arvados_task'}->save;
956
957   splice @jobstep_todo, $todo_ptr, 1;
958   --$todo_ptr;
959
960   $progress_is_dirty = 1;
961
962   while (!@freeslot
963          ||
964          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
965   {
966     last THISROUND if $main::please_freeze || defined($main::success);
967     if ($main::please_info)
968     {
969       $main::please_info = 0;
970       freeze();
971       create_output_collection();
972       save_meta(1);
973       update_progress_stats();
974     }
975     my $gotsome
976         = readfrompipes ()
977         + reapchildren ();
978     if (!$gotsome)
979     {
980       check_refresh_wanted();
981       check_squeue();
982       update_progress_stats();
983       select (undef, undef, undef, 0.1);
984     }
985     elsif (time - $progress_stats_updated >= 30)
986     {
987       update_progress_stats();
988     }
989     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
990         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
991     {
992       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
993           .($thisround_failed+$thisround_succeeded)
994           .") -- giving up on this round";
995       Log (undef, $message);
996       last THISROUND;
997     }
998
999     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1000     for (my $i=$#freeslot; $i>=0; $i--) {
1001       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1002         push @holdslot, (splice @freeslot, $i, 1);
1003       }
1004     }
1005     for (my $i=$#holdslot; $i>=0; $i--) {
1006       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1007         push @freeslot, (splice @holdslot, $i, 1);
1008       }
1009     }
1010
1011     # give up if no nodes are succeeding
1012     if (!grep { $_->{node}->{losing_streak} == 0 &&
1013                     $_->{node}->{hold_count} < 4 } @slot) {
1014       my $message = "Every node has failed -- giving up on this round";
1015       Log (undef, $message);
1016       last THISROUND;
1017     }
1018   }
1019 }
1020
1021
1022 push @freeslot, splice @holdslot;
1023 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1024
1025
1026 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1027 while (%proc)
1028 {
1029   if ($main::please_continue) {
1030     $main::please_continue = 0;
1031     goto THISROUND;
1032   }
1033   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1034   readfrompipes ();
1035   if (!reapchildren())
1036   {
1037     check_refresh_wanted();
1038     check_squeue();
1039     update_progress_stats();
1040     select (undef, undef, undef, 0.1);
1041     killem (keys %proc) if $main::please_freeze;
1042   }
1043 }
1044
1045 update_progress_stats();
1046 freeze_if_want_freeze();
1047
1048
1049 if (!defined $main::success)
1050 {
1051   if (@jobstep_todo &&
1052       $thisround_succeeded == 0 &&
1053       ($thisround_failed == 0 || $thisround_failed > 4))
1054   {
1055     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1056     Log (undef, $message);
1057     $main::success = 0;
1058   }
1059   if (!@jobstep_todo)
1060   {
1061     $main::success = 1;
1062   }
1063 }
1064
1065 goto ONELEVEL if !defined $main::success;
1066
1067
1068 release_allocation();
1069 freeze();
1070 my $collated_output = &create_output_collection();
1071
1072 if (!$collated_output) {
1073   Log (undef, "Failed to write output collection");
1074 }
1075 else {
1076   Log(undef, "job output $collated_output");
1077   $Job->update_attributes('output' => $collated_output);
1078 }
1079
1080 Log (undef, "finish");
1081
1082 save_meta();
1083
1084 my $final_state;
1085 if ($collated_output && $main::success) {
1086   $final_state = 'Complete';
1087 } else {
1088   $final_state = 'Failed';
1089 }
1090 $Job->update_attributes('state' => $final_state);
1091
1092 exit (($final_state eq 'Complete') ? 0 : 1);
1093
1094
1095
1096 sub update_progress_stats
1097 {
1098   $progress_stats_updated = time;
1099   return if !$progress_is_dirty;
1100   my ($todo, $done, $running) = (scalar @jobstep_todo,
1101                                  scalar @jobstep_done,
1102                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1103   $Job->{'tasks_summary'} ||= {};
1104   $Job->{'tasks_summary'}->{'todo'} = $todo;
1105   $Job->{'tasks_summary'}->{'done'} = $done;
1106   $Job->{'tasks_summary'}->{'running'} = $running;
1107   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1108   Log (undef, "status: $done done, $running running, $todo todo");
1109   $progress_is_dirty = 0;
1110 }
1111
1112
1113
1114 sub reapchildren
1115 {
1116   my $pid = waitpid (-1, WNOHANG);
1117   return 0 if $pid <= 0;
1118
1119   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1120                   . "."
1121                   . $slot[$proc{$pid}->{slot}]->{cpu});
1122   my $jobstepid = $proc{$pid}->{jobstep};
1123   my $elapsed = time - $proc{$pid}->{time};
1124   my $Jobstep = $jobstep[$jobstepid];
1125
1126   my $childstatus = $?;
1127   my $exitvalue = $childstatus >> 8;
1128   my $exitinfo = "exit ".exit_status_s($childstatus);
1129   $Jobstep->{'arvados_task'}->reload;
1130   my $task_success = $Jobstep->{'arvados_task'}->{success};
1131
1132   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1133
1134   if (!defined $task_success) {
1135     # task did not indicate one way or the other --> fail
1136     $Jobstep->{'arvados_task'}->{success} = 0;
1137     $Jobstep->{'arvados_task'}->save;
1138     $task_success = 0;
1139   }
1140
1141   if (!$task_success)
1142   {
1143     my $temporary_fail;
1144     $temporary_fail ||= $Jobstep->{node_fail};
1145     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1146
1147     ++$thisround_failed;
1148     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1149
1150     # Check for signs of a failed or misconfigured node
1151     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1152         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1153       # Don't count this against jobstep failure thresholds if this
1154       # node is already suspected faulty and srun exited quickly
1155       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1156           $elapsed < 5) {
1157         Log ($jobstepid, "blaming failure on suspect node " .
1158              $slot[$proc{$pid}->{slot}]->{node}->{name});
1159         $temporary_fail ||= 1;
1160       }
1161       ban_node_by_slot($proc{$pid}->{slot});
1162     }
1163
1164     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1165                              ++$Jobstep->{'failures'},
1166                              $temporary_fail ? 'temporary' : 'permanent',
1167                              $elapsed));
1168
1169     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1170       # Give up on this task, and the whole job
1171       $main::success = 0;
1172     }
1173     # Put this task back on the todo queue
1174     push @jobstep_todo, $jobstepid;
1175     $Job->{'tasks_summary'}->{'failed'}++;
1176   }
1177   else
1178   {
1179     ++$thisround_succeeded;
1180     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1181     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1182     push @jobstep_done, $jobstepid;
1183     Log ($jobstepid, "success in $elapsed seconds");
1184   }
1185   $Jobstep->{exitcode} = $childstatus;
1186   $Jobstep->{finishtime} = time;
1187   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1188   $Jobstep->{'arvados_task'}->save;
1189   process_stderr ($jobstepid, $task_success);
1190   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1191                            length($Jobstep->{'arvados_task'}->{output}),
1192                            $Jobstep->{'arvados_task'}->{output}));
1193
1194   close $reader{$jobstepid};
1195   delete $reader{$jobstepid};
1196   delete $slot[$proc{$pid}->{slot}]->{pid};
1197   push @freeslot, $proc{$pid}->{slot};
1198   delete $proc{$pid};
1199
1200   if ($task_success) {
1201     # Load new tasks
1202     my $newtask_list = [];
1203     my $newtask_results;
1204     do {
1205       $newtask_results = api_call(
1206         "job_tasks/list",
1207         'where' => {
1208           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1209         },
1210         'order' => 'qsequence',
1211         'offset' => scalar(@$newtask_list),
1212       );
1213       push(@$newtask_list, @{$newtask_results->{items}});
1214     } while (@{$newtask_results->{items}});
1215     foreach my $arvados_task (@$newtask_list) {
1216       my $jobstep = {
1217         'level' => $arvados_task->{'sequence'},
1218         'failures' => 0,
1219         'arvados_task' => $arvados_task
1220       };
1221       push @jobstep, $jobstep;
1222       push @jobstep_todo, $#jobstep;
1223     }
1224   }
1225
1226   $progress_is_dirty = 1;
1227   1;
1228 }
1229
1230 sub check_refresh_wanted
1231 {
1232   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1233   if (@stat && $stat[9] > $latest_refresh) {
1234     $latest_refresh = scalar time;
1235     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1236     for my $attr ('cancelled_at',
1237                   'cancelled_by_user_uuid',
1238                   'cancelled_by_client_uuid',
1239                   'state') {
1240       $Job->{$attr} = $Job2->{$attr};
1241     }
1242     if ($Job->{'state'} ne "Running") {
1243       if ($Job->{'state'} eq "Cancelled") {
1244         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1245       } else {
1246         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1247       }
1248       $main::success = 0;
1249       $main::please_freeze = 1;
1250     }
1251   }
1252 }
1253
1254 sub check_squeue
1255 {
1256   my $last_squeue_check = $squeue_checked;
1257
1258   # Do not call `squeue` or check the kill list more than once every
1259   # 15 seconds.
1260   return if $last_squeue_check > time - 15;
1261   $squeue_checked = time;
1262
1263   # Look for children from which we haven't received stderr data since
1264   # the last squeue check. If no such children exist, all procs are
1265   # alive and there's no need to even look at squeue.
1266   #
1267   # As long as the crunchstat poll interval (10s) is shorter than the
1268   # squeue check interval (15s) this should make the squeue check an
1269   # infrequent event.
1270   my $silent_procs = 0;
1271   for my $jobstep (values %proc)
1272   {
1273     if ($jobstep->{stderr_at} < $last_squeue_check)
1274     {
1275       $silent_procs++;
1276     }
1277   }
1278   return if $silent_procs == 0;
1279
1280   # use killem() on procs whose killtime is reached
1281   while (my ($pid, $jobstep) = each %proc)
1282   {
1283     if (exists $jobstep->{killtime}
1284         && $jobstep->{killtime} <= time
1285         && $jobstep->{stderr_at} < $last_squeue_check)
1286     {
1287       my $sincewhen = "";
1288       if ($jobstep->{stderr_at}) {
1289         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1290       }
1291       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1292       killem ($pid);
1293     }
1294   }
1295
1296   if (!$have_slurm)
1297   {
1298     # here is an opportunity to check for mysterious problems with local procs
1299     return;
1300   }
1301
1302   # get a list of steps still running
1303   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%i %j' --noheader`;
1304   if ($? != 0)
1305   {
1306     Log(undef, "warning: squeue exit status $? ($!)");
1307     return;
1308   }
1309   chop @squeue;
1310
1311   # which of my jobsteps are running, according to squeue?
1312   my %ok;
1313   foreach (@squeue)
1314   {
1315     if (/^(\d+)\.(\d+) (\S+)/)
1316     {
1317       if ($1 eq $ENV{SLURM_JOB_ID})
1318       {
1319         $ok{$3} = 1;
1320       }
1321     }
1322   }
1323
1324   # Check for child procs >60s old and not mentioned by squeue.
1325   while (my ($pid, $jobstep) = each %proc)
1326   {
1327     if ($jobstep->{time} < time - 60
1328         && $jobstep->{jobstepname}
1329         && !exists $ok{$jobstep->{jobstepname}}
1330         && !exists $jobstep->{killtime})
1331     {
1332       # According to slurm, this task has ended (successfully or not)
1333       # -- but our srun child hasn't exited. First we must wait (30
1334       # seconds) in case this is just a race between communication
1335       # channels. Then, if our srun child process still hasn't
1336       # terminated, we'll conclude some slurm communication
1337       # error/delay has caused the task to die without notifying srun,
1338       # and we'll kill srun ourselves.
1339       $jobstep->{killtime} = time + 30;
1340       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1341     }
1342   }
1343 }
1344
1345
1346 sub release_allocation
1347 {
1348   if ($have_slurm)
1349   {
1350     Log (undef, "release job allocation");
1351     system "scancel $ENV{SLURM_JOB_ID}";
1352   }
1353 }
1354
1355
1356 sub readfrompipes
1357 {
1358   my $gotsome = 0;
1359   foreach my $job (keys %reader)
1360   {
1361     my $buf;
1362     while (0 < sysread ($reader{$job}, $buf, 8192))
1363     {
1364       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1365       $jobstep[$job]->{stderr_at} = time;
1366       $jobstep[$job]->{stderr} .= $buf;
1367       preprocess_stderr ($job);
1368       if (length ($jobstep[$job]->{stderr}) > 16384)
1369       {
1370         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1371       }
1372       $gotsome = 1;
1373     }
1374   }
1375   return $gotsome;
1376 }
1377
1378
1379 sub preprocess_stderr
1380 {
1381   my $job = shift;
1382
1383   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1384     my $line = $1;
1385     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1386     Log ($job, "stderr $line");
1387     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1388       # whoa.
1389       $main::please_freeze = 1;
1390     }
1391     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1392       $jobstep[$job]->{node_fail} = 1;
1393       ban_node_by_slot($jobstep[$job]->{slotindex});
1394     }
1395   }
1396 }
1397
1398
1399 sub process_stderr
1400 {
1401   my $job = shift;
1402   my $task_success = shift;
1403   preprocess_stderr ($job);
1404
1405   map {
1406     Log ($job, "stderr $_");
1407   } split ("\n", $jobstep[$job]->{stderr});
1408 }
1409
1410 sub fetch_block
1411 {
1412   my $hash = shift;
1413   my $keep;
1414   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1415     Log(undef, "fetch_block run error from arv-get $hash: $!");
1416     return undef;
1417   }
1418   my $output_block = "";
1419   while (1) {
1420     my $buf;
1421     my $bytes = sysread($keep, $buf, 1024 * 1024);
1422     if (!defined $bytes) {
1423       Log(undef, "fetch_block read error from arv-get: $!");
1424       $output_block = undef;
1425       last;
1426     } elsif ($bytes == 0) {
1427       # sysread returns 0 at the end of the pipe.
1428       last;
1429     } else {
1430       # some bytes were read into buf.
1431       $output_block .= $buf;
1432     }
1433   }
1434   close $keep;
1435   if ($?) {
1436     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1437     $output_block = undef;
1438   }
1439   return $output_block;
1440 }
1441
1442 # Create a collection by concatenating the output of all tasks (each
1443 # task's output is either a manifest fragment, a locator for a
1444 # manifest fragment stored in Keep, or nothing at all). Return the
1445 # portable_data_hash of the new collection.
1446 sub create_output_collection
1447 {
1448   Log (undef, "collate");
1449
1450   my ($child_out, $child_in);
1451   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1452 import arvados
1453 import sys
1454 print (arvados.api("v1").collections().
1455        create(body={"manifest_text": sys.stdin.read()}).
1456        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1457 }, retry_count());
1458
1459   my $task_idx = -1;
1460   my $manifest_size = 0;
1461   for (@jobstep)
1462   {
1463     ++$task_idx;
1464     my $output = $_->{'arvados_task'}->{output};
1465     next if (!defined($output));
1466     my $next_write;
1467     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1468       $next_write = fetch_block($output);
1469     } else {
1470       $next_write = $output;
1471     }
1472     if (defined($next_write)) {
1473       if (!defined(syswrite($child_in, $next_write))) {
1474         # There's been an error writing.  Stop the loop.
1475         # We'll log details about the exit code later.
1476         last;
1477       } else {
1478         $manifest_size += length($next_write);
1479       }
1480     } else {
1481       my $uuid = $_->{'arvados_task'}->{'uuid'};
1482       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1483       $main::success = 0;
1484     }
1485   }
1486   close($child_in);
1487   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1488
1489   my $joboutput;
1490   my $s = IO::Select->new($child_out);
1491   if ($s->can_read(120)) {
1492     sysread($child_out, $joboutput, 1024 * 1024);
1493     waitpid($pid, 0);
1494     if ($?) {
1495       Log(undef, "output collection creation exited " . exit_status_s($?));
1496       $joboutput = undef;
1497     } else {
1498       chomp($joboutput);
1499     }
1500   } else {
1501     Log (undef, "timed out while creating output collection");
1502     foreach my $signal (2, 2, 2, 15, 15, 9) {
1503       kill($signal, $pid);
1504       last if waitpid($pid, WNOHANG) == -1;
1505       sleep(1);
1506     }
1507   }
1508   close($child_out);
1509
1510   return $joboutput;
1511 }
1512
1513
1514 sub killem
1515 {
1516   foreach (@_)
1517   {
1518     my $sig = 2;                # SIGINT first
1519     if (exists $proc{$_}->{"sent_$sig"} &&
1520         time - $proc{$_}->{"sent_$sig"} > 4)
1521     {
1522       $sig = 15;                # SIGTERM if SIGINT doesn't work
1523     }
1524     if (exists $proc{$_}->{"sent_$sig"} &&
1525         time - $proc{$_}->{"sent_$sig"} > 4)
1526     {
1527       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1528     }
1529     if (!exists $proc{$_}->{"sent_$sig"})
1530     {
1531       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1532       kill $sig, $_;
1533       select (undef, undef, undef, 0.1);
1534       if ($sig == 2)
1535       {
1536         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1537       }
1538       $proc{$_}->{"sent_$sig"} = time;
1539       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1540     }
1541   }
1542 }
1543
1544
1545 sub fhbits
1546 {
1547   my($bits);
1548   for (@_) {
1549     vec($bits,fileno($_),1) = 1;
1550   }
1551   $bits;
1552 }
1553
1554
1555 # Send log output to Keep via arv-put.
1556 #
1557 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1558 # $log_pipe_pid is the pid of the arv-put subprocess.
1559 #
1560 # The only functions that should access these variables directly are:
1561 #
1562 # log_writer_start($logfilename)
1563 #     Starts an arv-put pipe, reading data on stdin and writing it to
1564 #     a $logfilename file in an output collection.
1565 #
1566 # log_writer_send($txt)
1567 #     Writes $txt to the output log collection.
1568 #
1569 # log_writer_finish()
1570 #     Closes the arv-put pipe and returns the output that it produces.
1571 #
1572 # log_writer_is_active()
1573 #     Returns a true value if there is currently a live arv-put
1574 #     process, false otherwise.
1575 #
1576 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1577
1578 sub log_writer_start($)
1579 {
1580   my $logfilename = shift;
1581   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1582                         'arv-put',
1583                         '--portable-data-hash',
1584                         '--project-uuid', $Job->{owner_uuid},
1585                         '--retries', '3',
1586                         '--name', $logfilename,
1587                         '--filename', $logfilename,
1588                         '-');
1589 }
1590
1591 sub log_writer_send($)
1592 {
1593   my $txt = shift;
1594   print $log_pipe_in $txt;
1595 }
1596
1597 sub log_writer_finish()
1598 {
1599   return unless $log_pipe_pid;
1600
1601   close($log_pipe_in);
1602   my $arv_put_output;
1603
1604   my $s = IO::Select->new($log_pipe_out);
1605   if ($s->can_read(120)) {
1606     sysread($log_pipe_out, $arv_put_output, 1024);
1607     chomp($arv_put_output);
1608   } else {
1609     Log (undef, "timed out reading from 'arv-put'");
1610   }
1611
1612   waitpid($log_pipe_pid, 0);
1613   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1614   if ($?) {
1615     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1616   }
1617
1618   return $arv_put_output;
1619 }
1620
1621 sub log_writer_is_active() {
1622   return $log_pipe_pid;
1623 }
1624
1625 sub Log                         # ($jobstep_id, $logmessage)
1626 {
1627   if ($_[1] =~ /\n/) {
1628     for my $line (split (/\n/, $_[1])) {
1629       Log ($_[0], $line);
1630     }
1631     return;
1632   }
1633   my $fh = select STDERR; $|=1; select $fh;
1634   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1635   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1636   $message .= "\n";
1637   my $datetime;
1638   if (log_writer_is_active() || -t STDERR) {
1639     my @gmtime = gmtime;
1640     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1641                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1642   }
1643   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1644
1645   if (log_writer_is_active()) {
1646     log_writer_send($datetime . " " . $message);
1647   }
1648 }
1649
1650
1651 sub croak
1652 {
1653   my ($package, $file, $line) = caller;
1654   my $message = "@_ at $file line $line\n";
1655   Log (undef, $message);
1656   freeze() if @jobstep_todo;
1657   create_output_collection() if @jobstep_todo;
1658   cleanup();
1659   save_meta();
1660   die;
1661 }
1662
1663
1664 sub cleanup
1665 {
1666   return unless $Job;
1667   if ($Job->{'state'} eq 'Cancelled') {
1668     $Job->update_attributes('finished_at' => scalar gmtime);
1669   } else {
1670     $Job->update_attributes('state' => 'Failed');
1671   }
1672 }
1673
1674
1675 sub save_meta
1676 {
1677   my $justcheckpoint = shift; # false if this will be the last meta saved
1678   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1679   return unless log_writer_is_active();
1680
1681   my $loglocator = log_writer_finish();
1682   Log (undef, "log manifest is $loglocator");
1683   $Job->{'log'} = $loglocator;
1684   $Job->update_attributes('log', $loglocator);
1685 }
1686
1687
1688 sub freeze_if_want_freeze
1689 {
1690   if ($main::please_freeze)
1691   {
1692     release_allocation();
1693     if (@_)
1694     {
1695       # kill some srun procs before freeze+stop
1696       map { $proc{$_} = {} } @_;
1697       while (%proc)
1698       {
1699         killem (keys %proc);
1700         select (undef, undef, undef, 0.1);
1701         my $died;
1702         while (($died = waitpid (-1, WNOHANG)) > 0)
1703         {
1704           delete $proc{$died};
1705         }
1706       }
1707     }
1708     freeze();
1709     create_output_collection();
1710     cleanup();
1711     save_meta();
1712     exit 1;
1713   }
1714 }
1715
1716
1717 sub freeze
1718 {
1719   Log (undef, "Freeze not implemented");
1720   return;
1721 }
1722
1723
1724 sub thaw
1725 {
1726   croak ("Thaw not implemented");
1727 }
1728
1729
1730 sub freezequote
1731 {
1732   my $s = shift;
1733   $s =~ s/\\/\\\\/g;
1734   $s =~ s/\n/\\n/g;
1735   return $s;
1736 }
1737
1738
1739 sub freezeunquote
1740 {
1741   my $s = shift;
1742   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1743   return $s;
1744 }
1745
1746
1747 sub srun
1748 {
1749   my $srunargs = shift;
1750   my $execargs = shift;
1751   my $opts = shift || {};
1752   my $stdin = shift;
1753   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1754
1755   $Data::Dumper::Terse = 1;
1756   $Data::Dumper::Indent = 0;
1757   my $show_cmd = Dumper($args);
1758   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1759   $show_cmd =~ s/\n/ /g;
1760   if ($opts->{fork}) {
1761     Log(undef, "starting: $show_cmd");
1762   } else {
1763     # This is a child process: parent is in charge of reading our
1764     # stderr and copying it to Log() if needed.
1765     warn "starting: $show_cmd\n";
1766   }
1767
1768   if (defined $stdin) {
1769     my $child = open STDIN, "-|";
1770     defined $child or die "no fork: $!";
1771     if ($child == 0) {
1772       print $stdin or die $!;
1773       close STDOUT or die $!;
1774       exit 0;
1775     }
1776   }
1777
1778   return system (@$args) if $opts->{fork};
1779
1780   exec @$args;
1781   warn "ENV size is ".length(join(" ",%ENV));
1782   die "exec failed: $!: @$args";
1783 }
1784
1785
1786 sub ban_node_by_slot {
1787   # Don't start any new jobsteps on this node for 60 seconds
1788   my $slotid = shift;
1789   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1790   $slot[$slotid]->{node}->{hold_count}++;
1791   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1792 }
1793
1794 sub must_lock_now
1795 {
1796   my ($lockfile, $error_message) = @_;
1797   open L, ">", $lockfile or croak("$lockfile: $!");
1798   if (!flock L, LOCK_EX|LOCK_NB) {
1799     croak("Can't lock $lockfile: $error_message\n");
1800   }
1801 }
1802
1803 sub find_docker_image {
1804   # Given a Keep locator, check to see if it contains a Docker image.
1805   # If so, return its stream name and Docker hash.
1806   # If not, return undef for both values.
1807   my $locator = shift;
1808   my ($streamname, $filename);
1809   my $image = api_call("collections/get", uuid => $locator);
1810   if ($image) {
1811     foreach my $line (split(/\n/, $image->{manifest_text})) {
1812       my @tokens = split(/\s+/, $line);
1813       next if (!@tokens);
1814       $streamname = shift(@tokens);
1815       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1816         if (defined($filename)) {
1817           return (undef, undef);  # More than one file in the Collection.
1818         } else {
1819           $filename = (split(/:/, $filedata, 3))[2];
1820         }
1821       }
1822     }
1823   }
1824   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1825     return ($streamname, $1);
1826   } else {
1827     return (undef, undef);
1828   }
1829 }
1830
1831 sub retry_count {
1832   # Calculate the number of times an operation should be retried,
1833   # assuming exponential backoff, and that we're willing to retry as
1834   # long as tasks have been running.  Enforce a minimum of 3 retries.
1835   my ($starttime, $endtime, $timediff, $retries);
1836   if (@jobstep) {
1837     $starttime = $jobstep[0]->{starttime};
1838     $endtime = $jobstep[-1]->{finishtime};
1839   }
1840   if (!defined($starttime)) {
1841     $timediff = 0;
1842   } elsif (!defined($endtime)) {
1843     $timediff = time - $starttime;
1844   } else {
1845     $timediff = ($endtime - $starttime) - (time - $endtime);
1846   }
1847   if ($timediff > 0) {
1848     $retries = int(log($timediff) / log(2));
1849   } else {
1850     $retries = 1;  # Use the minimum.
1851   }
1852   return ($retries > 3) ? $retries : 3;
1853 }
1854
1855 sub retry_op {
1856   # Pass in two function references.
1857   # This method will be called with the remaining arguments.
1858   # If it dies, retry it with exponential backoff until it succeeds,
1859   # or until the current retry_count is exhausted.  After each failure
1860   # that can be retried, the second function will be called with
1861   # the current try count (0-based), next try time, and error message.
1862   my $operation = shift;
1863   my $retry_callback = shift;
1864   my $retries = retry_count();
1865   foreach my $try_count (0..$retries) {
1866     my $next_try = time + (2 ** $try_count);
1867     my $result = eval { $operation->(@_); };
1868     if (!$@) {
1869       return $result;
1870     } elsif ($try_count < $retries) {
1871       $retry_callback->($try_count, $next_try, $@);
1872       my $sleep_time = $next_try - time;
1873       sleep($sleep_time) if ($sleep_time > 0);
1874     }
1875   }
1876   # Ensure the error message ends in a newline, so Perl doesn't add
1877   # retry_op's line number to it.
1878   chomp($@);
1879   die($@ . "\n");
1880 }
1881
1882 sub api_call {
1883   # Pass in a /-separated API method name, and arguments for it.
1884   # This function will call that method, retrying as needed until
1885   # the current retry_count is exhausted, with a log on the first failure.
1886   my $method_name = shift;
1887   my $log_api_retry = sub {
1888     my ($try_count, $next_try_at, $errmsg) = @_;
1889     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1890     $errmsg =~ s/\s/ /g;
1891     $errmsg =~ s/\s+$//;
1892     my $retry_msg;
1893     if ($next_try_at < time) {
1894       $retry_msg = "Retrying.";
1895     } else {
1896       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1897       $retry_msg = "Retrying at $next_try_fmt.";
1898     }
1899     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1900   };
1901   my $method = $arv;
1902   foreach my $key (split(/\//, $method_name)) {
1903     $method = $method->{$key};
1904   }
1905   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1906 }
1907
1908 sub exit_status_s {
1909   # Given a $?, return a human-readable exit code string like "0" or
1910   # "1" or "0 with signal 1" or "1 with signal 11".
1911   my $exitcode = shift;
1912   my $s = $exitcode >> 8;
1913   if ($exitcode & 0x7f) {
1914     $s .= " with signal " . ($exitcode & 0x7f);
1915   }
1916   if ($exitcode & 0x80) {
1917     $s .= " with core dump";
1918   }
1919   return $s;
1920 }
1921
1922 sub handle_readall {
1923   # Pass in a glob reference to a file handle.
1924   # Read all its contents and return them as a string.
1925   my $fh_glob_ref = shift;
1926   local $/ = undef;
1927   return <$fh_glob_ref>;
1928 }
1929
1930 sub tar_filename_n {
1931   my $n = shift;
1932   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1933 }
1934
1935 sub add_git_archive {
1936   # Pass in a git archive command as a string or list, a la system().
1937   # This method will save its output to be included in the archive sent to the
1938   # build script.
1939   my $git_input;
1940   $git_tar_count++;
1941   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1942     croak("Failed to save git archive: $!");
1943   }
1944   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1945   close($git_input);
1946   waitpid($git_pid, 0);
1947   close(GIT_ARCHIVE);
1948   if ($?) {
1949     croak("Failed to save git archive: git exited " . exit_status_s($?));
1950   }
1951 }
1952
1953 sub combined_git_archive {
1954   # Combine all saved tar archives into a single archive, then return its
1955   # contents in a string.  Return undef if no archives have been saved.
1956   if ($git_tar_count < 1) {
1957     return undef;
1958   }
1959   my $base_tar_name = tar_filename_n(1);
1960   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1961     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1962     if ($tar_exit != 0) {
1963       croak("Error preparing build archive: tar -A exited " .
1964             exit_status_s($tar_exit));
1965     }
1966   }
1967   if (!open(GIT_TAR, "<", $base_tar_name)) {
1968     croak("Could not open build archive: $!");
1969   }
1970   my $tar_contents = handle_readall(\*GIT_TAR);
1971   close(GIT_TAR);
1972   return $tar_contents;
1973 }
1974
1975 sub set_nonblocking {
1976   my $fh = shift;
1977   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1978   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1979 }
1980
1981 __DATA__
1982 #!/usr/bin/perl
1983 #
1984 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1985 # server invokes this script on individual compute nodes, or localhost if we're
1986 # running a job locally.  It gets called in two modes:
1987 #
1988 # * No arguments: Installation mode.  Read a tar archive from the DATA
1989 #   file handle; it includes the Crunch script's source code, and
1990 #   maybe SDKs as well.  Those should be installed in the proper
1991 #   locations.  This runs outside of any Docker container, so don't try to
1992 #   introspect Crunch's runtime environment.
1993 #
1994 # * With arguments: Crunch script run mode.  This script should set up the
1995 #   environment, then run the command specified in the arguments.  This runs
1996 #   inside any Docker container.
1997
1998 use Fcntl ':flock';
1999 use File::Path qw( make_path remove_tree );
2000 use POSIX qw(getcwd);
2001
2002 use constant TASK_TEMPFAIL => 111;
2003
2004 # Map SDK subdirectories to the path environments they belong to.
2005 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2006
2007 my $destdir = $ENV{"CRUNCH_SRC"};
2008 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2009 my $repo = $ENV{"CRUNCH_SRC_URL"};
2010 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2011 my $job_work = $ENV{"JOB_WORK"};
2012 my $task_work = $ENV{"TASK_WORK"};
2013
2014 open(STDOUT_ORIG, ">&", STDOUT);
2015 open(STDERR_ORIG, ">&", STDERR);
2016
2017 for my $dir ($destdir, $job_work, $task_work) {
2018   if ($dir) {
2019     make_path $dir;
2020     -e $dir or die "Failed to create temporary directory ($dir): $!";
2021   }
2022 }
2023
2024 if ($task_work) {
2025   remove_tree($task_work, {keep_root => 1});
2026 }
2027
2028 ### Crunch script run mode
2029 if (@ARGV) {
2030   # We want to do routine logging during task 0 only.  This gives the user
2031   # the information they need, but avoids repeating the information for every
2032   # task.
2033   my $Log;
2034   if ($ENV{TASK_SEQUENCE} eq "0") {
2035     $Log = sub {
2036       my $msg = shift;
2037       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2038     };
2039   } else {
2040     $Log = sub { };
2041   }
2042
2043   my $python_src = "$install_dir/python";
2044   my $venv_dir = "$job_work/.arvados.venv";
2045   my $venv_built = -e "$venv_dir/bin/activate";
2046   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2047     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2048                  "--python=python2.7", $venv_dir);
2049     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2050     $venv_built = 1;
2051     $Log->("Built Python SDK virtualenv");
2052   }
2053
2054   my $pip_bin = "pip";
2055   if ($venv_built) {
2056     $Log->("Running in Python SDK virtualenv");
2057     $pip_bin = "$venv_dir/bin/pip";
2058     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2059     @ARGV = ("/bin/sh", "-ec",
2060              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2061   } elsif (-d $python_src) {
2062     $Log->("Warning: virtualenv not found inside Docker container default " .
2063            "\$PATH. Can't install Python SDK.");
2064   }
2065
2066   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2067   if ($pkgs) {
2068     $Log->("Using Arvados SDK:");
2069     foreach my $line (split /\n/, $pkgs) {
2070       $Log->($line);
2071     }
2072   } else {
2073     $Log->("Arvados SDK packages not found");
2074   }
2075
2076   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2077     my $sdk_path = "$install_dir/$sdk_dir";
2078     if (-d $sdk_path) {
2079       if ($ENV{$sdk_envkey}) {
2080         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2081       } else {
2082         $ENV{$sdk_envkey} = $sdk_path;
2083       }
2084       $Log->("Arvados SDK added to %s", $sdk_envkey);
2085     }
2086   }
2087
2088   exec(@ARGV);
2089   die "Cannot exec `@ARGV`: $!";
2090 }
2091
2092 ### Installation mode
2093 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2094 flock L, LOCK_EX;
2095 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2096   # This exact git archive (source + arvados sdk) is already installed
2097   # here, so there's no need to reinstall it.
2098
2099   # We must consume our DATA section, though: otherwise the process
2100   # feeding it to us will get SIGPIPE.
2101   my $buf;
2102   while (read(DATA, $buf, 65536)) { }
2103
2104   exit(0);
2105 }
2106
2107 unlink "$destdir.archive_hash";
2108 mkdir $destdir;
2109
2110 do {
2111   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2112   local $SIG{PIPE} = "IGNORE";
2113   warn "Extracting archive: $archive_hash\n";
2114   # --ignore-zeros is necessary sometimes: depending on how much NUL
2115   # padding tar -A put on our combined archive (which in turn depends
2116   # on the length of the component archives) tar without
2117   # --ignore-zeros will exit before consuming stdin and cause close()
2118   # to fail on the resulting SIGPIPE.
2119   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2120     die "Error launching 'tar -xC $destdir': $!";
2121   }
2122   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2123   # get SIGPIPE.  We must feed it data incrementally.
2124   my $tar_input;
2125   while (read(DATA, $tar_input, 65536)) {
2126     print TARX $tar_input;
2127   }
2128   if(!close(TARX)) {
2129     die "'tar -xC $destdir' exited $?: $!";
2130   }
2131 };
2132
2133 mkdir $install_dir;
2134
2135 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2136 if (-d $sdk_root) {
2137   foreach my $sdk_lang (("python",
2138                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2139     if (-d "$sdk_root/$sdk_lang") {
2140       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2141         die "Failed to install $sdk_lang SDK: $!";
2142       }
2143     }
2144   }
2145 }
2146
2147 my $python_dir = "$install_dir/python";
2148 if ((-d $python_dir) and can_run("python2.7") and
2149     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2150   # egg_info failed, probably when it asked git for a build tag.
2151   # Specify no build tag.
2152   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2153   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2154   close($pysdk_cfg);
2155 }
2156
2157 # Hide messages from the install script (unless it fails: shell_or_die
2158 # will show $destdir.log in that case).
2159 open(STDOUT, ">>", "$destdir.log");
2160 open(STDERR, ">&", STDOUT);
2161
2162 if (-e "$destdir/crunch_scripts/install") {
2163     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2164 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2165     # Old version
2166     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2167 } elsif (-e "./install.sh") {
2168     shell_or_die (undef, "./install.sh", $install_dir);
2169 }
2170
2171 if ($archive_hash) {
2172     unlink "$destdir.archive_hash.new";
2173     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2174     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2175 }
2176
2177 close L;
2178
2179 sub can_run {
2180   my $command_name = shift;
2181   open(my $which, "-|", "which", $command_name);
2182   while (<$which>) { }
2183   close($which);
2184   return ($? == 0);
2185 }
2186
2187 sub shell_or_die
2188 {
2189   my $exitcode = shift;
2190
2191   if ($ENV{"DEBUG"}) {
2192     print STDERR "@_\n";
2193   }
2194   if (system (@_) != 0) {
2195     my $err = $!;
2196     my $code = $?;
2197     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2198     open STDERR, ">&STDERR_ORIG";
2199     system ("cat $destdir.log >&2");
2200     warn "@_ failed ($err): $exitstatus";
2201     if (defined($exitcode)) {
2202       exit $exitcode;
2203     }
2204     else {
2205       exit (($code >> 8) || 1);
2206     }
2207   }
2208 }
2209
2210 __DATA__