6146: Use new SLURM_JOB_ID env var instead of old SLURM_JOBID
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my %proc;
122 my $force_unlock;
123 my $git_dir;
124 my $jobspec;
125 my $job_api_token;
126 my $no_clear_tmp;
127 my $resume_stash;
128 my $docker_bin = "/usr/bin/docker.io";
129 GetOptions('force-unlock' => \$force_unlock,
130            'git-dir=s' => \$git_dir,
131            'job=s' => \$jobspec,
132            'job-api-token=s' => \$job_api_token,
133            'no-clear-tmp' => \$no_clear_tmp,
134            'resume-stash=s' => \$resume_stash,
135            'docker-bin=s' => \$docker_bin,
136     );
137
138 if (defined $job_api_token) {
139   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
140 }
141
142 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
143
144
145 $SIG{'USR1'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 1;
148 };
149 $SIG{'USR2'} = sub
150 {
151   $main::ENV{CRUNCH_DEBUG} = 0;
152 };
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $local_job;
163 if ($jobspec =~ /^[-a-z\d]+$/)
164 {
165   # $jobspec is an Arvados UUID, not a JSON job specification
166   $Job = api_call("jobs/get", uuid => $jobspec);
167   $local_job = 0;
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172   $local_job = 1;
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if ($Job->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $Job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $Job->{'started_at'} = gmtime;
216   $Job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $Job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  losing_streak => 0,
297                  hold_until => 0 };
298     foreach my $cpu (1..$ncpus)
299     {
300       push @slot, { node => $node,
301                     cpu => $cpu };
302     }
303   }
304   push @node, @nodelist;
305 }
306
307
308
309 # Ensure that we get one jobstep running on each allocated node before
310 # we start overloading nodes with concurrent steps
311
312 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
313
314
315 $Job->update_attributes(
316   'tasks_summary' => { 'failed' => 0,
317                        'todo' => 1,
318                        'running' => 0,
319                        'done' => 0 });
320
321 Log (undef, "start");
322 $SIG{'INT'} = sub { $main::please_freeze = 1; };
323 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
324 $SIG{'TERM'} = \&croak;
325 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
326 $SIG{'ALRM'} = sub { $main::please_info = 1; };
327 $SIG{'CONT'} = sub { $main::please_continue = 1; };
328 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
329
330 $main::please_freeze = 0;
331 $main::please_info = 0;
332 $main::please_continue = 0;
333 $main::please_refresh = 0;
334 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
335
336 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
337 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
338 $ENV{"JOB_UUID"} = $job_id;
339
340
341 my @jobstep_todo = ();
342 my @jobstep_done = ();
343 my @jobstep_tomerge = ();
344 my $jobstep_tomerge_level = 0;
345 my $squeue_checked = 0;
346 my $latest_refresh = scalar time;
347
348
349
350 if (defined $Job->{thawedfromkey})
351 {
352   thaw ($Job->{thawedfromkey});
353 }
354 else
355 {
356   my $first_task = api_call("job_tasks/create", job_task => {
357     'job_uuid' => $Job->{'uuid'},
358     'sequence' => 0,
359     'qsequence' => 0,
360     'parameters' => {},
361   });
362   push @jobstep, { 'level' => 0,
363                    'failures' => 0,
364                    'arvados_task' => $first_task,
365                  };
366   push @jobstep_todo, 0;
367 }
368
369
370 if (!$have_slurm)
371 {
372   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
373 }
374
375 my $build_script = handle_readall(\*DATA);
376 my $nodelist = join(",", @node);
377 my $git_tar_count = 0;
378
379 if (!defined $no_clear_tmp) {
380   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
381   Log (undef, "Clean work dirs");
382
383   my $cleanpid = fork();
384   if ($cleanpid == 0)
385   {
386     # Find FUSE mounts that look like Keep mounts (the mount path has the
387     # word "keep") and unmount them.  Then clean up work directories.
388     # TODO: When #5036 is done and widely deployed, we can get rid of the
389     # regular expression and just unmount everything with type fuse.keep.
390     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
391           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
392     exit (1);
393   }
394   while (1)
395   {
396     last if $cleanpid == waitpid (-1, WNOHANG);
397     freeze_if_want_freeze ($cleanpid);
398     select (undef, undef, undef, 0.1);
399   }
400   Log (undef, "Cleanup command exited ".exit_status_s($?));
401 }
402
403 # If this job requires a Docker image, install that.
404 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
405 if ($docker_locator = $Job->{docker_image_locator}) {
406   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
407   if (!$docker_hash)
408   {
409     croak("No Docker image hash found from locator $docker_locator");
410   }
411   $docker_stream =~ s/^\.//;
412   my $docker_install_script = qq{
413 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
414     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
415 fi
416 };
417   my $docker_pid = fork();
418   if ($docker_pid == 0)
419   {
420     srun (["srun", "--nodelist=" . join(',', @node)],
421           ["/bin/sh", "-ec", $docker_install_script]);
422     exit ($?);
423   }
424   while (1)
425   {
426     last if $docker_pid == waitpid (-1, WNOHANG);
427     freeze_if_want_freeze ($docker_pid);
428     select (undef, undef, undef, 0.1);
429   }
430   if ($? != 0)
431   {
432     croak("Installing Docker image from $docker_locator exited "
433           .exit_status_s($?));
434   }
435
436   # Determine whether this version of Docker supports memory+swap limits.
437   srun(["srun", "--nodelist=" . $node[0]],
438        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
439       {fork => 1});
440   $docker_limitmem = ($? == 0);
441
442   if ($Job->{arvados_sdk_version}) {
443     # The job also specifies an Arvados SDK version.  Add the SDKs to the
444     # tar file for the build script to install.
445     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
446                        $Job->{arvados_sdk_version}));
447     add_git_archive("git", "--git-dir=$git_dir", "archive",
448                     "--prefix=.arvados.sdk/",
449                     $Job->{arvados_sdk_version}, "sdk");
450   }
451 }
452
453 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
454   # If script_version looks like an absolute path, *and* the --git-dir
455   # argument was not given -- which implies we were not invoked by
456   # crunch-dispatch -- we will use the given path as a working
457   # directory instead of resolving script_version to a git commit (or
458   # doing anything else with git).
459   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
460   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
461 }
462 else {
463   # Resolve the given script_version to a git commit sha1. Also, if
464   # the repository is remote, clone it into our local filesystem: this
465   # ensures "git archive" will work, and is necessary to reliably
466   # resolve a symbolic script_version like "master^".
467   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
468
469   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
470
471   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
472
473   # If we're running under crunch-dispatch, it will have already
474   # pulled the appropriate source tree into its own repository, and
475   # given us that repo's path as $git_dir.
476   #
477   # If we're running a "local" job, we might have to fetch content
478   # from a remote repository.
479   #
480   # (Currently crunch-dispatch gives a local path with --git-dir, but
481   # we might as well accept URLs there too in case it changes its
482   # mind.)
483   my $repo = $git_dir || $Job->{'repository'};
484
485   # Repository can be remote or local. If remote, we'll need to fetch it
486   # to a local dir before doing `git log` et al.
487   my $repo_location;
488
489   if ($repo =~ m{://|^[^/]*:}) {
490     # $repo is a git url we can clone, like git:// or https:// or
491     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
492     # not recognized here because distinguishing that from a local
493     # path is too fragile. If you really need something strange here,
494     # use the ssh:// form.
495     $repo_location = 'remote';
496   } elsif ($repo =~ m{^\.*/}) {
497     # $repo is a local path to a git index. We'll also resolve ../foo
498     # to ../foo/.git if the latter is a directory. To help
499     # disambiguate local paths from named hosted repositories, this
500     # form must be given as ./ or ../ if it's a relative path.
501     if (-d "$repo/.git") {
502       $repo = "$repo/.git";
503     }
504     $repo_location = 'local';
505   } else {
506     # $repo is none of the above. It must be the name of a hosted
507     # repository.
508     my $arv_repo_list = api_call("repositories/list",
509                                  'filters' => [['name','=',$repo]]);
510     my @repos_found = @{$arv_repo_list->{'items'}};
511     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
512     if ($n_found > 0) {
513       Log(undef, "Repository '$repo' -> "
514           . join(", ", map { $_->{'uuid'} } @repos_found));
515     }
516     if ($n_found != 1) {
517       croak("Error: Found $n_found repositories with name '$repo'.");
518     }
519     $repo = $repos_found[0]->{'fetch_url'};
520     $repo_location = 'remote';
521   }
522   Log(undef, "Using $repo_location repository '$repo'");
523   $ENV{"CRUNCH_SRC_URL"} = $repo;
524
525   # Resolve given script_version (we'll call that $treeish here) to a
526   # commit sha1 ($commit).
527   my $treeish = $Job->{'script_version'};
528   my $commit;
529   if ($repo_location eq 'remote') {
530     # We minimize excess object-fetching by re-using the same bare
531     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
532     # just keep adding remotes to it as needed.
533     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
534     my $gitcmd = "git --git-dir=\Q$local_repo\E";
535
536     # Set up our local repo for caching remote objects, making
537     # archives, etc.
538     if (!-d $local_repo) {
539       make_path($local_repo) or croak("Error: could not create $local_repo");
540     }
541     # This works (exits 0 and doesn't delete fetched objects) even
542     # if $local_repo is already initialized:
543     `$gitcmd init --bare`;
544     if ($?) {
545       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
546     }
547
548     # If $treeish looks like a hash (or abbrev hash) we look it up in
549     # our local cache first, since that's cheaper. (We don't want to
550     # do that with tags/branches though -- those change over time, so
551     # they should always be resolved by the remote repo.)
552     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
553       # Hide stderr because it's normal for this to fail:
554       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
555       if ($? == 0 &&
556           # Careful not to resolve a branch named abcdeff to commit 1234567:
557           $sha1 =~ /^$treeish/ &&
558           $sha1 =~ /^([0-9a-f]{40})$/s) {
559         $commit = $1;
560         Log(undef, "Commit $commit already present in $local_repo");
561       }
562     }
563
564     if (!defined $commit) {
565       # If $treeish isn't just a hash or abbrev hash, or isn't here
566       # yet, we need to fetch the remote to resolve it correctly.
567
568       # First, remove all local heads. This prevents a name that does
569       # not exist on the remote from resolving to (or colliding with)
570       # a previously fetched branch or tag (possibly from a different
571       # remote).
572       remove_tree("$local_repo/refs/heads", {keep_root => 1});
573
574       Log(undef, "Fetching objects from $repo to $local_repo");
575       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
576       if ($?) {
577         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
578       }
579     }
580
581     # Now that the data is all here, we will use our local repo for
582     # the rest of our git activities.
583     $repo = $local_repo;
584   }
585
586   my $gitcmd = "git --git-dir=\Q$repo\E";
587   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
588   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
589     croak("`$gitcmd rev-list` exited "
590           .exit_status_s($?)
591           .", '$treeish' not found. Giving up.");
592   }
593   $commit = $1;
594   Log(undef, "Version $treeish is commit $commit");
595
596   if ($commit ne $Job->{'script_version'}) {
597     # Record the real commit id in the database, frozentokey, logs,
598     # etc. -- instead of an abbreviation or a branch name which can
599     # become ambiguous or point to a different commit in the future.
600     if (!$Job->update_attributes('script_version' => $commit)) {
601       croak("Error: failed to update job's script_version attribute");
602     }
603   }
604
605   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
606   add_git_archive("$gitcmd archive ''\Q$commit\E");
607 }
608
609 my $git_archive = combined_git_archive();
610 if (!defined $git_archive) {
611   Log(undef, "Skip install phase (no git archive)");
612   if ($have_slurm) {
613     Log(undef, "Warning: This probably means workers have no source tree!");
614   }
615 }
616 else {
617   my $install_exited;
618   my $install_script_tries_left = 3;
619   for (my $attempts = 0; $attempts < 3; $attempts++) {
620     Log(undef, "Run install script on all workers");
621
622     my @srunargs = ("srun",
623                     "--nodelist=$nodelist",
624                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
625     my @execargs = ("sh", "-c",
626                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
627
628     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
629     my ($install_stderr_r, $install_stderr_w);
630     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
631     set_nonblocking($install_stderr_r);
632     my $installpid = fork();
633     if ($installpid == 0)
634     {
635       close($install_stderr_r);
636       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
637       open(STDOUT, ">&", $install_stderr_w);
638       open(STDERR, ">&", $install_stderr_w);
639       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
640       exit (1);
641     }
642     close($install_stderr_w);
643     # Tell freeze_if_want_freeze how to kill the child, otherwise the
644     # "waitpid(installpid)" loop won't get interrupted by a freeze:
645     $proc{$installpid} = {};
646     my $stderr_buf = '';
647     # Track whether anything appears on stderr other than slurm errors
648     # ("srun: ...") and the "starting: ..." message printed by the
649     # srun subroutine itself:
650     my $stderr_anything_from_script = 0;
651     my $match_our_own_errors = '^(srun: error: |starting: \[)';
652     while ($installpid != waitpid(-1, WNOHANG)) {
653       freeze_if_want_freeze ($installpid);
654       # Wait up to 0.1 seconds for something to appear on stderr, then
655       # do a non-blocking read.
656       my $bits = fhbits($install_stderr_r);
657       select ($bits, undef, $bits, 0.1);
658       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
659       {
660         while ($stderr_buf =~ /^(.*?)\n/) {
661           my $line = $1;
662           substr $stderr_buf, 0, 1+length($line), "";
663           Log(undef, "stderr $line");
664           if ($line !~ /$match_our_own_errors/) {
665             $stderr_anything_from_script = 1;
666           }
667         }
668       }
669     }
670     delete $proc{$installpid};
671     $install_exited = $?;
672     close($install_stderr_r);
673     if (length($stderr_buf) > 0) {
674       if ($stderr_buf !~ /$match_our_own_errors/) {
675         $stderr_anything_from_script = 1;
676       }
677       Log(undef, "stderr $stderr_buf")
678     }
679
680     Log (undef, "Install script exited ".exit_status_s($install_exited));
681     last if $install_exited == 0 || $main::please_freeze;
682     # If the install script fails but doesn't print an error message,
683     # the next thing anyone is likely to do is just run it again in
684     # case it was a transient problem like "slurm communication fails
685     # because the network isn't reliable enough". So we'll just do
686     # that ourselves (up to 3 attempts in total). OTOH, if there is an
687     # error message, the problem is more likely to have a real fix and
688     # we should fail the job so the fixing process can start, instead
689     # of doing 2 more attempts.
690     last if $stderr_anything_from_script;
691   }
692
693   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
694     unlink($tar_filename);
695   }
696
697   if ($install_exited != 0) {
698     croak("Giving up");
699   }
700 }
701
702 foreach (qw (script script_version script_parameters runtime_constraints))
703 {
704   Log (undef,
705        "$_ " .
706        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
707 }
708 foreach (split (/\n/, $Job->{knobs}))
709 {
710   Log (undef, "knob " . $_);
711 }
712
713
714
715 $main::success = undef;
716
717
718
719 ONELEVEL:
720
721 my $thisround_succeeded = 0;
722 my $thisround_failed = 0;
723 my $thisround_failed_multiple = 0;
724
725 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
726                        or $a <=> $b } @jobstep_todo;
727 my $level = $jobstep[$jobstep_todo[0]]->{level};
728
729 my $initial_tasks_this_level = 0;
730 foreach my $id (@jobstep_todo) {
731   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
732 }
733
734 # If the number of tasks scheduled at this level #T is smaller than the number
735 # of slots available #S, only use the first #T slots, or the first slot on
736 # each node, whichever number is greater.
737 #
738 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
739 # based on these numbers.  Using fewer slots makes more resources available
740 # to each individual task, which should normally be a better strategy when
741 # there are fewer of them running with less parallelism.
742 #
743 # Note that this calculation is not redone if the initial tasks at
744 # this level queue more tasks at the same level.  This may harm
745 # overall task throughput for that level.
746 my @freeslot;
747 if ($initial_tasks_this_level < @node) {
748   @freeslot = (0..$#node);
749 } elsif ($initial_tasks_this_level < @slot) {
750   @freeslot = (0..$initial_tasks_this_level - 1);
751 } else {
752   @freeslot = (0..$#slot);
753 }
754 my $round_num_freeslots = scalar(@freeslot);
755
756 my %round_max_slots = ();
757 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
758   my $this_slot = $slot[$freeslot[$ii]];
759   my $node_name = $this_slot->{node}->{name};
760   $round_max_slots{$node_name} ||= $this_slot->{cpu};
761   last if (scalar(keys(%round_max_slots)) >= @node);
762 }
763
764 Log(undef, "start level $level with $round_num_freeslots slots");
765 my @holdslot;
766 my %reader;
767 my $progress_is_dirty = 1;
768 my $progress_stats_updated = 0;
769
770 update_progress_stats();
771
772
773 THISROUND:
774 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
775 {
776   my $id = $jobstep_todo[$todo_ptr];
777   my $Jobstep = $jobstep[$id];
778   if ($Jobstep->{level} != $level)
779   {
780     next;
781   }
782
783   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
784   set_nonblocking($reader{$id});
785
786   my $childslot = $freeslot[0];
787   my $childnode = $slot[$childslot]->{node};
788   my $childslotname = join (".",
789                             $slot[$childslot]->{node}->{name},
790                             $slot[$childslot]->{cpu});
791
792   my $childpid = fork();
793   if ($childpid == 0)
794   {
795     $SIG{'INT'} = 'DEFAULT';
796     $SIG{'QUIT'} = 'DEFAULT';
797     $SIG{'TERM'} = 'DEFAULT';
798
799     foreach (values (%reader))
800     {
801       close($_);
802     }
803     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
804     open(STDOUT,">&writer");
805     open(STDERR,">&writer");
806
807     undef $dbh;
808     undef $sth;
809
810     delete $ENV{"GNUPGHOME"};
811     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
812     $ENV{"TASK_QSEQUENCE"} = $id;
813     $ENV{"TASK_SEQUENCE"} = $level;
814     $ENV{"JOB_SCRIPT"} = $Job->{script};
815     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
816       $param =~ tr/a-z/A-Z/;
817       $ENV{"JOB_PARAMETER_$param"} = $value;
818     }
819     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
820     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
821     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
822     $ENV{"HOME"} = $ENV{"TASK_WORK"};
823     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
824     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
825     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
826     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
827
828     $ENV{"GZIP"} = "-n";
829
830     my @srunargs = (
831       "srun",
832       "--nodelist=".$childnode->{name},
833       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
834       "--job-name=$job_id.$id.$$",
835         );
836     my $command =
837         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
838         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
839         ."&& cd $ENV{CRUNCH_TMP} "
840         # These environment variables get used explicitly later in
841         # $command.  No tool is expected to read these values directly.
842         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
843         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
844         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
845         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
846     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
847     if ($docker_hash)
848     {
849       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
850       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
851       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
852       # We only set memory limits if Docker lets us limit both memory and swap.
853       # Memory limits alone have been supported longer, but subprocesses tend
854       # to get SIGKILL if they exceed that without any swap limit set.
855       # See #5642 for additional background.
856       if ($docker_limitmem) {
857         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
858       }
859
860       # Dynamically configure the container to use the host system as its
861       # DNS server.  Get the host's global addresses from the ip command,
862       # and turn them into docker --dns options using gawk.
863       $command .=
864           q{$(ip -o address show scope global |
865               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
866
867       # The source tree and $destdir directory (which we have
868       # installed on the worker host) are available in the container,
869       # under the same path.
870       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
871       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
872
873       # Currently, we make arv-mount's mount point appear at /keep
874       # inside the container (instead of using the same path as the
875       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
876       # crunch scripts and utilities must not rely on this. They must
877       # use $TASK_KEEPMOUNT.
878       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
879       $ENV{TASK_KEEPMOUNT} = "/keep";
880
881       # TASK_WORK is almost exactly like a docker data volume: it
882       # starts out empty, is writable, and persists until no
883       # containers use it any more. We don't use --volumes-from to
884       # share it with other containers: it is only accessible to this
885       # task, and it goes away when this task stops.
886       #
887       # However, a docker data volume is writable only by root unless
888       # the mount point already happens to exist in the container with
889       # different permissions. Therefore, we [1] assume /tmp already
890       # exists in the image and is writable by the crunch user; [2]
891       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
892       # writable if they are created by docker while setting up the
893       # other --volumes); and [3] create $TASK_WORK inside the
894       # container using $build_script.
895       $command .= "--volume=/tmp ";
896       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
897       $ENV{"HOME"} = $ENV{"TASK_WORK"};
898       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
899
900       # TODO: Share a single JOB_WORK volume across all task
901       # containers on a given worker node, and delete it when the job
902       # ends (and, in case that doesn't work, when the next job
903       # starts).
904       #
905       # For now, use the same approach as TASK_WORK above.
906       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
907
908       while (my ($env_key, $env_val) = each %ENV)
909       {
910         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
911           $command .= "--env=\Q$env_key=$env_val\E ";
912         }
913       }
914       $command .= "--env=\QHOME=$ENV{HOME}\E ";
915       $command .= "\Q$docker_hash\E ";
916       $command .= "stdbuf --output=0 --error=0 ";
917       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
918     } else {
919       # Non-docker run
920       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
921       $command .= "stdbuf --output=0 --error=0 ";
922       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
923     }
924
925     my @execargs = ('bash', '-c', $command);
926     srun (\@srunargs, \@execargs, undef, $build_script);
927     # exec() failed, we assume nothing happened.
928     die "srun() failed on build script\n";
929   }
930   close("writer");
931   if (!defined $childpid)
932   {
933     close $reader{$id};
934     delete $reader{$id};
935     next;
936   }
937   shift @freeslot;
938   $proc{$childpid} = { jobstep => $id,
939                        time => time,
940                        slot => $childslot,
941                        jobstepname => "$job_id.$id.$childpid",
942                      };
943   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
944   $slot[$childslot]->{pid} = $childpid;
945
946   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
947   Log ($id, "child $childpid started on $childslotname");
948   $Jobstep->{starttime} = time;
949   $Jobstep->{node} = $childnode->{name};
950   $Jobstep->{slotindex} = $childslot;
951   delete $Jobstep->{stderr};
952   delete $Jobstep->{finishtime};
953
954   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
955   $Jobstep->{'arvados_task'}->save;
956
957   splice @jobstep_todo, $todo_ptr, 1;
958   --$todo_ptr;
959
960   $progress_is_dirty = 1;
961
962   while (!@freeslot
963          ||
964          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
965   {
966     last THISROUND if $main::please_freeze || defined($main::success);
967     if ($main::please_info)
968     {
969       $main::please_info = 0;
970       freeze();
971       create_output_collection();
972       save_meta(1);
973       update_progress_stats();
974     }
975     my $gotsome
976         = readfrompipes ()
977         + reapchildren ();
978     if (!$gotsome)
979     {
980       check_refresh_wanted();
981       check_squeue();
982       update_progress_stats();
983       select (undef, undef, undef, 0.1);
984     }
985     elsif (time - $progress_stats_updated >= 30)
986     {
987       update_progress_stats();
988     }
989     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
990         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
991     {
992       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
993           .($thisround_failed+$thisround_succeeded)
994           .") -- giving up on this round";
995       Log (undef, $message);
996       last THISROUND;
997     }
998
999     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1000     for (my $i=$#freeslot; $i>=0; $i--) {
1001       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1002         push @holdslot, (splice @freeslot, $i, 1);
1003       }
1004     }
1005     for (my $i=$#holdslot; $i>=0; $i--) {
1006       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1007         push @freeslot, (splice @holdslot, $i, 1);
1008       }
1009     }
1010
1011     # give up if no nodes are succeeding
1012     if (!grep { $_->{node}->{losing_streak} == 0 &&
1013                     $_->{node}->{hold_count} < 4 } @slot) {
1014       my $message = "Every node has failed -- giving up on this round";
1015       Log (undef, $message);
1016       last THISROUND;
1017     }
1018   }
1019 }
1020
1021
1022 push @freeslot, splice @holdslot;
1023 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1024
1025
1026 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1027 while (%proc)
1028 {
1029   if ($main::please_continue) {
1030     $main::please_continue = 0;
1031     goto THISROUND;
1032   }
1033   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1034   readfrompipes ();
1035   if (!reapchildren())
1036   {
1037     check_refresh_wanted();
1038     check_squeue();
1039     update_progress_stats();
1040     select (undef, undef, undef, 0.1);
1041     killem (keys %proc) if $main::please_freeze;
1042   }
1043 }
1044
1045 update_progress_stats();
1046 freeze_if_want_freeze();
1047
1048
1049 if (!defined $main::success)
1050 {
1051   if (@jobstep_todo &&
1052       $thisround_succeeded == 0 &&
1053       ($thisround_failed == 0 || $thisround_failed > 4))
1054   {
1055     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1056     Log (undef, $message);
1057     $main::success = 0;
1058   }
1059   if (!@jobstep_todo)
1060   {
1061     $main::success = 1;
1062   }
1063 }
1064
1065 goto ONELEVEL if !defined $main::success;
1066
1067
1068 release_allocation();
1069 freeze();
1070 my $collated_output = &create_output_collection();
1071
1072 if (!$collated_output) {
1073   Log (undef, "Failed to write output collection");
1074 }
1075 else {
1076   Log(undef, "job output $collated_output");
1077   $Job->update_attributes('output' => $collated_output);
1078 }
1079
1080 Log (undef, "finish");
1081
1082 save_meta();
1083
1084 my $final_state;
1085 if ($collated_output && $main::success) {
1086   $final_state = 'Complete';
1087 } else {
1088   $final_state = 'Failed';
1089 }
1090 $Job->update_attributes('state' => $final_state);
1091
1092 exit (($final_state eq 'Complete') ? 0 : 1);
1093
1094
1095
1096 sub update_progress_stats
1097 {
1098   $progress_stats_updated = time;
1099   return if !$progress_is_dirty;
1100   my ($todo, $done, $running) = (scalar @jobstep_todo,
1101                                  scalar @jobstep_done,
1102                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1103   $Job->{'tasks_summary'} ||= {};
1104   $Job->{'tasks_summary'}->{'todo'} = $todo;
1105   $Job->{'tasks_summary'}->{'done'} = $done;
1106   $Job->{'tasks_summary'}->{'running'} = $running;
1107   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1108   Log (undef, "status: $done done, $running running, $todo todo");
1109   $progress_is_dirty = 0;
1110 }
1111
1112
1113
1114 sub reapchildren
1115 {
1116   my $pid = waitpid (-1, WNOHANG);
1117   return 0 if $pid <= 0;
1118
1119   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1120                   . "."
1121                   . $slot[$proc{$pid}->{slot}]->{cpu});
1122   my $jobstepid = $proc{$pid}->{jobstep};
1123   my $elapsed = time - $proc{$pid}->{time};
1124   my $Jobstep = $jobstep[$jobstepid];
1125
1126   my $childstatus = $?;
1127   my $exitvalue = $childstatus >> 8;
1128   my $exitinfo = "exit ".exit_status_s($childstatus);
1129   $Jobstep->{'arvados_task'}->reload;
1130   my $task_success = $Jobstep->{'arvados_task'}->{success};
1131
1132   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1133
1134   if (!defined $task_success) {
1135     # task did not indicate one way or the other --> fail
1136     $Jobstep->{'arvados_task'}->{success} = 0;
1137     $Jobstep->{'arvados_task'}->save;
1138     $task_success = 0;
1139   }
1140
1141   if (!$task_success)
1142   {
1143     my $temporary_fail;
1144     $temporary_fail ||= $Jobstep->{node_fail};
1145     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1146
1147     ++$thisround_failed;
1148     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1149
1150     # Check for signs of a failed or misconfigured node
1151     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1152         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1153       # Don't count this against jobstep failure thresholds if this
1154       # node is already suspected faulty and srun exited quickly
1155       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1156           $elapsed < 5) {
1157         Log ($jobstepid, "blaming failure on suspect node " .
1158              $slot[$proc{$pid}->{slot}]->{node}->{name});
1159         $temporary_fail ||= 1;
1160       }
1161       ban_node_by_slot($proc{$pid}->{slot});
1162     }
1163
1164     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1165                              ++$Jobstep->{'failures'},
1166                              $temporary_fail ? 'temporary' : 'permanent',
1167                              $elapsed));
1168
1169     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1170       # Give up on this task, and the whole job
1171       $main::success = 0;
1172     }
1173     # Put this task back on the todo queue
1174     push @jobstep_todo, $jobstepid;
1175     $Job->{'tasks_summary'}->{'failed'}++;
1176   }
1177   else
1178   {
1179     ++$thisround_succeeded;
1180     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1181     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1182     push @jobstep_done, $jobstepid;
1183     Log ($jobstepid, "success in $elapsed seconds");
1184   }
1185   $Jobstep->{exitcode} = $childstatus;
1186   $Jobstep->{finishtime} = time;
1187   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1188   $Jobstep->{'arvados_task'}->save;
1189   process_stderr ($jobstepid, $task_success);
1190   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1191                            length($Jobstep->{'arvados_task'}->{output}),
1192                            $Jobstep->{'arvados_task'}->{output}));
1193
1194   close $reader{$jobstepid};
1195   delete $reader{$jobstepid};
1196   delete $slot[$proc{$pid}->{slot}]->{pid};
1197   push @freeslot, $proc{$pid}->{slot};
1198   delete $proc{$pid};
1199
1200   if ($task_success) {
1201     # Load new tasks
1202     my $newtask_list = [];
1203     my $newtask_results;
1204     do {
1205       $newtask_results = api_call(
1206         "job_tasks/list",
1207         'where' => {
1208           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1209         },
1210         'order' => 'qsequence',
1211         'offset' => scalar(@$newtask_list),
1212       );
1213       push(@$newtask_list, @{$newtask_results->{items}});
1214     } while (@{$newtask_results->{items}});
1215     foreach my $arvados_task (@$newtask_list) {
1216       my $jobstep = {
1217         'level' => $arvados_task->{'sequence'},
1218         'failures' => 0,
1219         'arvados_task' => $arvados_task
1220       };
1221       push @jobstep, $jobstep;
1222       push @jobstep_todo, $#jobstep;
1223     }
1224   }
1225
1226   $progress_is_dirty = 1;
1227   1;
1228 }
1229
1230 sub check_refresh_wanted
1231 {
1232   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1233   if (@stat && $stat[9] > $latest_refresh) {
1234     $latest_refresh = scalar time;
1235     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1236     for my $attr ('cancelled_at',
1237                   'cancelled_by_user_uuid',
1238                   'cancelled_by_client_uuid',
1239                   'state') {
1240       $Job->{$attr} = $Job2->{$attr};
1241     }
1242     if ($Job->{'state'} ne "Running") {
1243       if ($Job->{'state'} eq "Cancelled") {
1244         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1245       } else {
1246         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1247       }
1248       $main::success = 0;
1249       $main::please_freeze = 1;
1250     }
1251   }
1252 }
1253
1254 sub check_squeue
1255 {
1256   my $last_squeue_check = $squeue_checked;
1257
1258   # Do not call `squeue` or check the kill list more than once every
1259   # 15 seconds.
1260   return if $last_squeue_check > time - 15;
1261   $squeue_checked = time;
1262
1263   # Look for children from which we haven't received stderr data since
1264   # the last squeue check. If no such children exist, all procs are
1265   # alive and there's no need to even look at squeue.
1266   #
1267   # As long as the crunchstat poll interval (10s) is shorter than the
1268   # squeue check interval (15s) this should make the squeue check an
1269   # infrequent event.
1270   my $silent_procs = 0;
1271   for my $jobstep (values %proc)
1272   {
1273     if ($jobstep->{stderr_at} < $last_squeue_check)
1274     {
1275       $silent_procs++;
1276     }
1277   }
1278   return if $silent_procs == 0;
1279
1280   # use killem() on procs whose killtime is reached
1281   while (my ($pid, $jobstep) = each %proc)
1282   {
1283     if (exists $jobstep->{killtime}
1284         && $jobstep->{killtime} <= time
1285         && $jobstep->{stderr_at} < $last_squeue_check)
1286     {
1287       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task disappeared from slurm queue)");
1288       killem ($pid);
1289     }
1290   }
1291
1292   if (!$have_slurm)
1293   {
1294     # here is an opportunity to check for mysterious problems with local procs
1295     return;
1296   }
1297
1298   # get a list of steps still running
1299   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%i %j' --noheader`;
1300   if ($? != 0)
1301   {
1302     Log(undef, "warning: squeue exit status $? ($!)");
1303     return;
1304   }
1305   chop @squeue;
1306
1307   # which of my jobsteps are running, according to squeue?
1308   my %ok;
1309   foreach (@squeue)
1310   {
1311     if (/^(\d+)\.(\d+) (\S+)/)
1312     {
1313       if ($1 eq $ENV{SLURM_JOB_ID})
1314       {
1315         $ok{$3} = 1;
1316       }
1317     }
1318   }
1319
1320   # Check for child procs >60s old and not mentioned by squeue.
1321   while (my ($pid, $jobstep) = each %proc)
1322   {
1323     if ($jobstep->{time} < time - 60
1324         && $jobstep->{jobstepname}
1325         && !exists $ok{$jobstep->{jobstepname}}
1326         && !exists $jobstep->{killtime})
1327     {
1328       # According to slurm, this task has ended (successfully or not)
1329       # -- but our srun child hasn't exited. First we must wait (30
1330       # seconds) in case this is just a race between communication
1331       # channels. Then, if our srun child process still hasn't
1332       # terminated, we'll conclude some slurm communication
1333       # error/delay has caused the task to die without notifying srun,
1334       # and we'll kill srun ourselves.
1335       $jobstep->{killtime} = time + 30;
1336       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1337     }
1338   }
1339 }
1340
1341
1342 sub release_allocation
1343 {
1344   if ($have_slurm)
1345   {
1346     Log (undef, "release job allocation");
1347     system "scancel $ENV{SLURM_JOB_ID}";
1348   }
1349 }
1350
1351
1352 sub readfrompipes
1353 {
1354   my $gotsome = 0;
1355   foreach my $job (keys %reader)
1356   {
1357     my $buf;
1358     while (0 < sysread ($reader{$job}, $buf, 8192))
1359     {
1360       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1361       $jobstep[$job]->{stderr_at} = time;
1362       $jobstep[$job]->{stderr} .= $buf;
1363       preprocess_stderr ($job);
1364       if (length ($jobstep[$job]->{stderr}) > 16384)
1365       {
1366         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1367       }
1368       $gotsome = 1;
1369     }
1370   }
1371   return $gotsome;
1372 }
1373
1374
1375 sub preprocess_stderr
1376 {
1377   my $job = shift;
1378
1379   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1380     my $line = $1;
1381     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1382     Log ($job, "stderr $line");
1383     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1384       # whoa.
1385       $main::please_freeze = 1;
1386     }
1387     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1388       $jobstep[$job]->{node_fail} = 1;
1389       ban_node_by_slot($jobstep[$job]->{slotindex});
1390     }
1391   }
1392 }
1393
1394
1395 sub process_stderr
1396 {
1397   my $job = shift;
1398   my $task_success = shift;
1399   preprocess_stderr ($job);
1400
1401   map {
1402     Log ($job, "stderr $_");
1403   } split ("\n", $jobstep[$job]->{stderr});
1404 }
1405
1406 sub fetch_block
1407 {
1408   my $hash = shift;
1409   my $keep;
1410   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1411     Log(undef, "fetch_block run error from arv-get $hash: $!");
1412     return undef;
1413   }
1414   my $output_block = "";
1415   while (1) {
1416     my $buf;
1417     my $bytes = sysread($keep, $buf, 1024 * 1024);
1418     if (!defined $bytes) {
1419       Log(undef, "fetch_block read error from arv-get: $!");
1420       $output_block = undef;
1421       last;
1422     } elsif ($bytes == 0) {
1423       # sysread returns 0 at the end of the pipe.
1424       last;
1425     } else {
1426       # some bytes were read into buf.
1427       $output_block .= $buf;
1428     }
1429   }
1430   close $keep;
1431   if ($?) {
1432     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1433     $output_block = undef;
1434   }
1435   return $output_block;
1436 }
1437
1438 # Create a collection by concatenating the output of all tasks (each
1439 # task's output is either a manifest fragment, a locator for a
1440 # manifest fragment stored in Keep, or nothing at all). Return the
1441 # portable_data_hash of the new collection.
1442 sub create_output_collection
1443 {
1444   Log (undef, "collate");
1445
1446   my ($child_out, $child_in);
1447   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1448 import arvados
1449 import sys
1450 print (arvados.api("v1").collections().
1451        create(body={"manifest_text": sys.stdin.read()}).
1452        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1453 }, retry_count());
1454
1455   my $task_idx = -1;
1456   my $manifest_size = 0;
1457   for (@jobstep)
1458   {
1459     ++$task_idx;
1460     my $output = $_->{'arvados_task'}->{output};
1461     next if (!defined($output));
1462     my $next_write;
1463     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1464       $next_write = fetch_block($output);
1465     } else {
1466       $next_write = $output;
1467     }
1468     if (defined($next_write)) {
1469       if (!defined(syswrite($child_in, $next_write))) {
1470         # There's been an error writing.  Stop the loop.
1471         # We'll log details about the exit code later.
1472         last;
1473       } else {
1474         $manifest_size += length($next_write);
1475       }
1476     } else {
1477       my $uuid = $_->{'arvados_task'}->{'uuid'};
1478       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1479       $main::success = 0;
1480     }
1481   }
1482   close($child_in);
1483   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1484
1485   my $joboutput;
1486   my $s = IO::Select->new($child_out);
1487   if ($s->can_read(120)) {
1488     sysread($child_out, $joboutput, 1024 * 1024);
1489     waitpid($pid, 0);
1490     if ($?) {
1491       Log(undef, "output collection creation exited " . exit_status_s($?));
1492       $joboutput = undef;
1493     } else {
1494       chomp($joboutput);
1495     }
1496   } else {
1497     Log (undef, "timed out while creating output collection");
1498     foreach my $signal (2, 2, 2, 15, 15, 9) {
1499       kill($signal, $pid);
1500       last if waitpid($pid, WNOHANG) == -1;
1501       sleep(1);
1502     }
1503   }
1504   close($child_out);
1505
1506   return $joboutput;
1507 }
1508
1509
1510 sub killem
1511 {
1512   foreach (@_)
1513   {
1514     my $sig = 2;                # SIGINT first
1515     if (exists $proc{$_}->{"sent_$sig"} &&
1516         time - $proc{$_}->{"sent_$sig"} > 4)
1517     {
1518       $sig = 15;                # SIGTERM if SIGINT doesn't work
1519     }
1520     if (exists $proc{$_}->{"sent_$sig"} &&
1521         time - $proc{$_}->{"sent_$sig"} > 4)
1522     {
1523       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1524     }
1525     if (!exists $proc{$_}->{"sent_$sig"})
1526     {
1527       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1528       kill $sig, $_;
1529       select (undef, undef, undef, 0.1);
1530       if ($sig == 2)
1531       {
1532         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1533       }
1534       $proc{$_}->{"sent_$sig"} = time;
1535       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1536     }
1537   }
1538 }
1539
1540
1541 sub fhbits
1542 {
1543   my($bits);
1544   for (@_) {
1545     vec($bits,fileno($_),1) = 1;
1546   }
1547   $bits;
1548 }
1549
1550
1551 # Send log output to Keep via arv-put.
1552 #
1553 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1554 # $log_pipe_pid is the pid of the arv-put subprocess.
1555 #
1556 # The only functions that should access these variables directly are:
1557 #
1558 # log_writer_start($logfilename)
1559 #     Starts an arv-put pipe, reading data on stdin and writing it to
1560 #     a $logfilename file in an output collection.
1561 #
1562 # log_writer_send($txt)
1563 #     Writes $txt to the output log collection.
1564 #
1565 # log_writer_finish()
1566 #     Closes the arv-put pipe and returns the output that it produces.
1567 #
1568 # log_writer_is_active()
1569 #     Returns a true value if there is currently a live arv-put
1570 #     process, false otherwise.
1571 #
1572 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1573
1574 sub log_writer_start($)
1575 {
1576   my $logfilename = shift;
1577   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1578                         'arv-put',
1579                         '--portable-data-hash',
1580                         '--project-uuid', $Job->{owner_uuid},
1581                         '--retries', '3',
1582                         '--name', $logfilename,
1583                         '--filename', $logfilename,
1584                         '-');
1585 }
1586
1587 sub log_writer_send($)
1588 {
1589   my $txt = shift;
1590   print $log_pipe_in $txt;
1591 }
1592
1593 sub log_writer_finish()
1594 {
1595   return unless $log_pipe_pid;
1596
1597   close($log_pipe_in);
1598   my $arv_put_output;
1599
1600   my $s = IO::Select->new($log_pipe_out);
1601   if ($s->can_read(120)) {
1602     sysread($log_pipe_out, $arv_put_output, 1024);
1603     chomp($arv_put_output);
1604   } else {
1605     Log (undef, "timed out reading from 'arv-put'");
1606   }
1607
1608   waitpid($log_pipe_pid, 0);
1609   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1610   if ($?) {
1611     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1612   }
1613
1614   return $arv_put_output;
1615 }
1616
1617 sub log_writer_is_active() {
1618   return $log_pipe_pid;
1619 }
1620
1621 sub Log                         # ($jobstep_id, $logmessage)
1622 {
1623   if ($_[1] =~ /\n/) {
1624     for my $line (split (/\n/, $_[1])) {
1625       Log ($_[0], $line);
1626     }
1627     return;
1628   }
1629   my $fh = select STDERR; $|=1; select $fh;
1630   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1631   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1632   $message .= "\n";
1633   my $datetime;
1634   if (log_writer_is_active() || -t STDERR) {
1635     my @gmtime = gmtime;
1636     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1637                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1638   }
1639   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1640
1641   if (log_writer_is_active()) {
1642     log_writer_send($datetime . " " . $message);
1643   }
1644 }
1645
1646
1647 sub croak
1648 {
1649   my ($package, $file, $line) = caller;
1650   my $message = "@_ at $file line $line\n";
1651   Log (undef, $message);
1652   freeze() if @jobstep_todo;
1653   create_output_collection() if @jobstep_todo;
1654   cleanup();
1655   save_meta();
1656   die;
1657 }
1658
1659
1660 sub cleanup
1661 {
1662   return unless $Job;
1663   if ($Job->{'state'} eq 'Cancelled') {
1664     $Job->update_attributes('finished_at' => scalar gmtime);
1665   } else {
1666     $Job->update_attributes('state' => 'Failed');
1667   }
1668 }
1669
1670
1671 sub save_meta
1672 {
1673   my $justcheckpoint = shift; # false if this will be the last meta saved
1674   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1675   return unless log_writer_is_active();
1676
1677   my $loglocator = log_writer_finish();
1678   Log (undef, "log manifest is $loglocator");
1679   $Job->{'log'} = $loglocator;
1680   $Job->update_attributes('log', $loglocator);
1681 }
1682
1683
1684 sub freeze_if_want_freeze
1685 {
1686   if ($main::please_freeze)
1687   {
1688     release_allocation();
1689     if (@_)
1690     {
1691       # kill some srun procs before freeze+stop
1692       map { $proc{$_} = {} } @_;
1693       while (%proc)
1694       {
1695         killem (keys %proc);
1696         select (undef, undef, undef, 0.1);
1697         my $died;
1698         while (($died = waitpid (-1, WNOHANG)) > 0)
1699         {
1700           delete $proc{$died};
1701         }
1702       }
1703     }
1704     freeze();
1705     create_output_collection();
1706     cleanup();
1707     save_meta();
1708     exit 1;
1709   }
1710 }
1711
1712
1713 sub freeze
1714 {
1715   Log (undef, "Freeze not implemented");
1716   return;
1717 }
1718
1719
1720 sub thaw
1721 {
1722   croak ("Thaw not implemented");
1723 }
1724
1725
1726 sub freezequote
1727 {
1728   my $s = shift;
1729   $s =~ s/\\/\\\\/g;
1730   $s =~ s/\n/\\n/g;
1731   return $s;
1732 }
1733
1734
1735 sub freezeunquote
1736 {
1737   my $s = shift;
1738   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1739   return $s;
1740 }
1741
1742
1743 sub srun
1744 {
1745   my $srunargs = shift;
1746   my $execargs = shift;
1747   my $opts = shift || {};
1748   my $stdin = shift;
1749   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1750
1751   $Data::Dumper::Terse = 1;
1752   $Data::Dumper::Indent = 0;
1753   my $show_cmd = Dumper($args);
1754   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1755   $show_cmd =~ s/\n/ /g;
1756   if ($opts->{fork}) {
1757     Log(undef, "starting: $show_cmd");
1758   } else {
1759     # This is a child process: parent is in charge of reading our
1760     # stderr and copying it to Log() if needed.
1761     warn "starting: $show_cmd\n";
1762   }
1763
1764   if (defined $stdin) {
1765     my $child = open STDIN, "-|";
1766     defined $child or die "no fork: $!";
1767     if ($child == 0) {
1768       print $stdin or die $!;
1769       close STDOUT or die $!;
1770       exit 0;
1771     }
1772   }
1773
1774   return system (@$args) if $opts->{fork};
1775
1776   exec @$args;
1777   warn "ENV size is ".length(join(" ",%ENV));
1778   die "exec failed: $!: @$args";
1779 }
1780
1781
1782 sub ban_node_by_slot {
1783   # Don't start any new jobsteps on this node for 60 seconds
1784   my $slotid = shift;
1785   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1786   $slot[$slotid]->{node}->{hold_count}++;
1787   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1788 }
1789
1790 sub must_lock_now
1791 {
1792   my ($lockfile, $error_message) = @_;
1793   open L, ">", $lockfile or croak("$lockfile: $!");
1794   if (!flock L, LOCK_EX|LOCK_NB) {
1795     croak("Can't lock $lockfile: $error_message\n");
1796   }
1797 }
1798
1799 sub find_docker_image {
1800   # Given a Keep locator, check to see if it contains a Docker image.
1801   # If so, return its stream name and Docker hash.
1802   # If not, return undef for both values.
1803   my $locator = shift;
1804   my ($streamname, $filename);
1805   my $image = api_call("collections/get", uuid => $locator);
1806   if ($image) {
1807     foreach my $line (split(/\n/, $image->{manifest_text})) {
1808       my @tokens = split(/\s+/, $line);
1809       next if (!@tokens);
1810       $streamname = shift(@tokens);
1811       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1812         if (defined($filename)) {
1813           return (undef, undef);  # More than one file in the Collection.
1814         } else {
1815           $filename = (split(/:/, $filedata, 3))[2];
1816         }
1817       }
1818     }
1819   }
1820   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1821     return ($streamname, $1);
1822   } else {
1823     return (undef, undef);
1824   }
1825 }
1826
1827 sub retry_count {
1828   # Calculate the number of times an operation should be retried,
1829   # assuming exponential backoff, and that we're willing to retry as
1830   # long as tasks have been running.  Enforce a minimum of 3 retries.
1831   my ($starttime, $endtime, $timediff, $retries);
1832   if (@jobstep) {
1833     $starttime = $jobstep[0]->{starttime};
1834     $endtime = $jobstep[-1]->{finishtime};
1835   }
1836   if (!defined($starttime)) {
1837     $timediff = 0;
1838   } elsif (!defined($endtime)) {
1839     $timediff = time - $starttime;
1840   } else {
1841     $timediff = ($endtime - $starttime) - (time - $endtime);
1842   }
1843   if ($timediff > 0) {
1844     $retries = int(log($timediff) / log(2));
1845   } else {
1846     $retries = 1;  # Use the minimum.
1847   }
1848   return ($retries > 3) ? $retries : 3;
1849 }
1850
1851 sub retry_op {
1852   # Pass in two function references.
1853   # This method will be called with the remaining arguments.
1854   # If it dies, retry it with exponential backoff until it succeeds,
1855   # or until the current retry_count is exhausted.  After each failure
1856   # that can be retried, the second function will be called with
1857   # the current try count (0-based), next try time, and error message.
1858   my $operation = shift;
1859   my $retry_callback = shift;
1860   my $retries = retry_count();
1861   foreach my $try_count (0..$retries) {
1862     my $next_try = time + (2 ** $try_count);
1863     my $result = eval { $operation->(@_); };
1864     if (!$@) {
1865       return $result;
1866     } elsif ($try_count < $retries) {
1867       $retry_callback->($try_count, $next_try, $@);
1868       my $sleep_time = $next_try - time;
1869       sleep($sleep_time) if ($sleep_time > 0);
1870     }
1871   }
1872   # Ensure the error message ends in a newline, so Perl doesn't add
1873   # retry_op's line number to it.
1874   chomp($@);
1875   die($@ . "\n");
1876 }
1877
1878 sub api_call {
1879   # Pass in a /-separated API method name, and arguments for it.
1880   # This function will call that method, retrying as needed until
1881   # the current retry_count is exhausted, with a log on the first failure.
1882   my $method_name = shift;
1883   my $log_api_retry = sub {
1884     my ($try_count, $next_try_at, $errmsg) = @_;
1885     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1886     $errmsg =~ s/\s/ /g;
1887     $errmsg =~ s/\s+$//;
1888     my $retry_msg;
1889     if ($next_try_at < time) {
1890       $retry_msg = "Retrying.";
1891     } else {
1892       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1893       $retry_msg = "Retrying at $next_try_fmt.";
1894     }
1895     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1896   };
1897   my $method = $arv;
1898   foreach my $key (split(/\//, $method_name)) {
1899     $method = $method->{$key};
1900   }
1901   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1902 }
1903
1904 sub exit_status_s {
1905   # Given a $?, return a human-readable exit code string like "0" or
1906   # "1" or "0 with signal 1" or "1 with signal 11".
1907   my $exitcode = shift;
1908   my $s = $exitcode >> 8;
1909   if ($exitcode & 0x7f) {
1910     $s .= " with signal " . ($exitcode & 0x7f);
1911   }
1912   if ($exitcode & 0x80) {
1913     $s .= " with core dump";
1914   }
1915   return $s;
1916 }
1917
1918 sub handle_readall {
1919   # Pass in a glob reference to a file handle.
1920   # Read all its contents and return them as a string.
1921   my $fh_glob_ref = shift;
1922   local $/ = undef;
1923   return <$fh_glob_ref>;
1924 }
1925
1926 sub tar_filename_n {
1927   my $n = shift;
1928   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1929 }
1930
1931 sub add_git_archive {
1932   # Pass in a git archive command as a string or list, a la system().
1933   # This method will save its output to be included in the archive sent to the
1934   # build script.
1935   my $git_input;
1936   $git_tar_count++;
1937   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1938     croak("Failed to save git archive: $!");
1939   }
1940   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1941   close($git_input);
1942   waitpid($git_pid, 0);
1943   close(GIT_ARCHIVE);
1944   if ($?) {
1945     croak("Failed to save git archive: git exited " . exit_status_s($?));
1946   }
1947 }
1948
1949 sub combined_git_archive {
1950   # Combine all saved tar archives into a single archive, then return its
1951   # contents in a string.  Return undef if no archives have been saved.
1952   if ($git_tar_count < 1) {
1953     return undef;
1954   }
1955   my $base_tar_name = tar_filename_n(1);
1956   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1957     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1958     if ($tar_exit != 0) {
1959       croak("Error preparing build archive: tar -A exited " .
1960             exit_status_s($tar_exit));
1961     }
1962   }
1963   if (!open(GIT_TAR, "<", $base_tar_name)) {
1964     croak("Could not open build archive: $!");
1965   }
1966   my $tar_contents = handle_readall(\*GIT_TAR);
1967   close(GIT_TAR);
1968   return $tar_contents;
1969 }
1970
1971 sub set_nonblocking {
1972   my $fh = shift;
1973   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1974   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1975 }
1976
1977 __DATA__
1978 #!/usr/bin/perl
1979 #
1980 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1981 # server invokes this script on individual compute nodes, or localhost if we're
1982 # running a job locally.  It gets called in two modes:
1983 #
1984 # * No arguments: Installation mode.  Read a tar archive from the DATA
1985 #   file handle; it includes the Crunch script's source code, and
1986 #   maybe SDKs as well.  Those should be installed in the proper
1987 #   locations.  This runs outside of any Docker container, so don't try to
1988 #   introspect Crunch's runtime environment.
1989 #
1990 # * With arguments: Crunch script run mode.  This script should set up the
1991 #   environment, then run the command specified in the arguments.  This runs
1992 #   inside any Docker container.
1993
1994 use Fcntl ':flock';
1995 use File::Path qw( make_path remove_tree );
1996 use POSIX qw(getcwd);
1997
1998 use constant TASK_TEMPFAIL => 111;
1999
2000 # Map SDK subdirectories to the path environments they belong to.
2001 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2002
2003 my $destdir = $ENV{"CRUNCH_SRC"};
2004 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2005 my $repo = $ENV{"CRUNCH_SRC_URL"};
2006 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2007 my $job_work = $ENV{"JOB_WORK"};
2008 my $task_work = $ENV{"TASK_WORK"};
2009
2010 open(STDOUT_ORIG, ">&", STDOUT);
2011 open(STDERR_ORIG, ">&", STDERR);
2012
2013 for my $dir ($destdir, $job_work, $task_work) {
2014   if ($dir) {
2015     make_path $dir;
2016     -e $dir or die "Failed to create temporary directory ($dir): $!";
2017   }
2018 }
2019
2020 if ($task_work) {
2021   remove_tree($task_work, {keep_root => 1});
2022 }
2023
2024 ### Crunch script run mode
2025 if (@ARGV) {
2026   # We want to do routine logging during task 0 only.  This gives the user
2027   # the information they need, but avoids repeating the information for every
2028   # task.
2029   my $Log;
2030   if ($ENV{TASK_SEQUENCE} eq "0") {
2031     $Log = sub {
2032       my $msg = shift;
2033       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2034     };
2035   } else {
2036     $Log = sub { };
2037   }
2038
2039   my $python_src = "$install_dir/python";
2040   my $venv_dir = "$job_work/.arvados.venv";
2041   my $venv_built = -e "$venv_dir/bin/activate";
2042   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2043     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2044                  "--python=python2.7", $venv_dir);
2045     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2046     $venv_built = 1;
2047     $Log->("Built Python SDK virtualenv");
2048   }
2049
2050   my $pip_bin = "pip";
2051   if ($venv_built) {
2052     $Log->("Running in Python SDK virtualenv");
2053     $pip_bin = "$venv_dir/bin/pip";
2054     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2055     @ARGV = ("/bin/sh", "-ec",
2056              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2057   } elsif (-d $python_src) {
2058     $Log->("Warning: virtualenv not found inside Docker container default " .
2059            "\$PATH. Can't install Python SDK.");
2060   }
2061
2062   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2063   if ($pkgs) {
2064     $Log->("Using Arvados SDK:");
2065     foreach my $line (split /\n/, $pkgs) {
2066       $Log->($line);
2067     }
2068   } else {
2069     $Log->("Arvados SDK packages not found");
2070   }
2071
2072   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2073     my $sdk_path = "$install_dir/$sdk_dir";
2074     if (-d $sdk_path) {
2075       if ($ENV{$sdk_envkey}) {
2076         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2077       } else {
2078         $ENV{$sdk_envkey} = $sdk_path;
2079       }
2080       $Log->("Arvados SDK added to %s", $sdk_envkey);
2081     }
2082   }
2083
2084   exec(@ARGV);
2085   die "Cannot exec `@ARGV`: $!";
2086 }
2087
2088 ### Installation mode
2089 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2090 flock L, LOCK_EX;
2091 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2092   # This exact git archive (source + arvados sdk) is already installed
2093   # here, so there's no need to reinstall it.
2094
2095   # We must consume our DATA section, though: otherwise the process
2096   # feeding it to us will get SIGPIPE.
2097   my $buf;
2098   while (read(DATA, $buf, 65536)) { }
2099
2100   exit(0);
2101 }
2102
2103 unlink "$destdir.archive_hash";
2104 mkdir $destdir;
2105
2106 do {
2107   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2108   local $SIG{PIPE} = "IGNORE";
2109   warn "Extracting archive: $archive_hash\n";
2110   # --ignore-zeros is necessary sometimes: depending on how much NUL
2111   # padding tar -A put on our combined archive (which in turn depends
2112   # on the length of the component archives) tar without
2113   # --ignore-zeros will exit before consuming stdin and cause close()
2114   # to fail on the resulting SIGPIPE.
2115   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2116     die "Error launching 'tar -xC $destdir': $!";
2117   }
2118   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2119   # get SIGPIPE.  We must feed it data incrementally.
2120   my $tar_input;
2121   while (read(DATA, $tar_input, 65536)) {
2122     print TARX $tar_input;
2123   }
2124   if(!close(TARX)) {
2125     die "'tar -xC $destdir' exited $?: $!";
2126   }
2127 };
2128
2129 mkdir $install_dir;
2130
2131 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2132 if (-d $sdk_root) {
2133   foreach my $sdk_lang (("python",
2134                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2135     if (-d "$sdk_root/$sdk_lang") {
2136       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2137         die "Failed to install $sdk_lang SDK: $!";
2138       }
2139     }
2140   }
2141 }
2142
2143 my $python_dir = "$install_dir/python";
2144 if ((-d $python_dir) and can_run("python2.7") and
2145     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2146   # egg_info failed, probably when it asked git for a build tag.
2147   # Specify no build tag.
2148   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2149   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2150   close($pysdk_cfg);
2151 }
2152
2153 # Hide messages from the install script (unless it fails: shell_or_die
2154 # will show $destdir.log in that case).
2155 open(STDOUT, ">>", "$destdir.log");
2156 open(STDERR, ">&", STDOUT);
2157
2158 if (-e "$destdir/crunch_scripts/install") {
2159     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2160 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2161     # Old version
2162     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2163 } elsif (-e "./install.sh") {
2164     shell_or_die (undef, "./install.sh", $install_dir);
2165 }
2166
2167 if ($archive_hash) {
2168     unlink "$destdir.archive_hash.new";
2169     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2170     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2171 }
2172
2173 close L;
2174
2175 sub can_run {
2176   my $command_name = shift;
2177   open(my $which, "-|", "which", $command_name);
2178   while (<$which>) { }
2179   close($which);
2180   return ($? == 0);
2181 }
2182
2183 sub shell_or_die
2184 {
2185   my $exitcode = shift;
2186
2187   if ($ENV{"DEBUG"}) {
2188     print STDERR "@_\n";
2189   }
2190   if (system (@_) != 0) {
2191     my $err = $!;
2192     my $code = $?;
2193     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2194     open STDERR, ">&STDERR_ORIG";
2195     system ("cat $destdir.log >&2");
2196     warn "@_ failed ($err): $exitstatus";
2197     if (defined($exitcode)) {
2198       exit $exitcode;
2199     }
2200     else {
2201       exit (($code >> 8) || 1);
2202     }
2203   }
2204 }
2205
2206 __DATA__