Merge branch '8319-bcbio-cwl' closes #8319
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
187      $cmd,
188      {fork => 1});
189 if ($? != 0) {
190   Log(undef, "Sanity check failed: ".exit_status_s($?));
191   exit EX_TEMPFAIL;
192 }
193 Log(undef, "Sanity check OK");
194
195
196 my $User = api_call("users/current");
197
198 if (!$local_job) {
199   if (!$force_unlock) {
200     # Claim this job, and make sure nobody else does
201     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
202     if ($@) {
203       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
204       exit EX_TEMPFAIL;
205     };
206   }
207 }
208 else
209 {
210   if (!$resume_stash)
211   {
212     map { croak ("No $_ specified") unless $local_job->{$_} }
213     qw(script script_version script_parameters);
214   }
215
216   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217   $local_job->{'started_at'} = gmtime;
218   $local_job->{'state'} = 'Running';
219
220   $Job = api_call("jobs/create", job => $local_job);
221 }
222 $job_id = $Job->{'uuid'};
223
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
226
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
230
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
232 if ($? == 0) {
233   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234   chomp($gem_versions);
235   chop($gem_versions);  # Closing parentheses
236 } else {
237   $gem_versions = "";
238 }
239 Log(undef,
240     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
241
242 Log (undef, "check slurm allocation");
243 my @slot;
244 my @node;
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
246 my @sinfo;
247 if (!$have_slurm)
248 {
249   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250   push @sinfo, "$localcpus localhost";
251 }
252 if (exists $ENV{SLURM_NODELIST})
253 {
254   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
255 }
256 foreach (@sinfo)
257 {
258   my ($ncpus, $slurm_nodelist) = split;
259   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260
261   my @nodelist;
262   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263   {
264     my $nodelist = $1;
265     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266     {
267       my $ranges = $1;
268       foreach (split (",", $ranges))
269       {
270         my ($a, $b);
271         if (/(\d+)-(\d+)/)
272         {
273           $a = $1;
274           $b = $2;
275         }
276         else
277         {
278           $a = $_;
279           $b = $_;
280         }
281         push @nodelist, map {
282           my $n = $nodelist;
283           $n =~ s/\[[-,\d]+\]/$_/;
284           $n;
285         } ($a..$b);
286       }
287     }
288     else
289     {
290       push @nodelist, $nodelist;
291     }
292   }
293   foreach my $nodename (@nodelist)
294   {
295     Log (undef, "node $nodename - $ncpus slots");
296     my $node = { name => $nodename,
297                  ncpus => $ncpus,
298                  # The number of consecutive times a task has been dispatched
299                  # to this node and failed.
300                  losing_streak => 0,
301                  # The number of consecutive times that SLURM has reported
302                  # a node failure since the last successful task.
303                  fail_count => 0,
304                  # Don't dispatch work to this node until this time
305                  # (in seconds since the epoch) has passed.
306                  hold_until => 0 };
307     foreach my $cpu (1..$ncpus)
308     {
309       push @slot, { node => $node,
310                     cpu => $cpu };
311     }
312   }
313   push @node, @nodelist;
314 }
315
316
317
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
320
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322
323
324 $Job->update_attributes(
325   'tasks_summary' => { 'failed' => 0,
326                        'todo' => 1,
327                        'running' => 0,
328                        'done' => 0 });
329
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
338
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
344
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
348
349
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
356
357
358
359 if (defined $Job->{thawedfromkey})
360 {
361   thaw ($Job->{thawedfromkey});
362 }
363 else
364 {
365   my $first_task = api_call("job_tasks/create", job_task => {
366     'job_uuid' => $Job->{'uuid'},
367     'sequence' => 0,
368     'qsequence' => 0,
369     'parameters' => {},
370   });
371   push @jobstep, { 'level' => 0,
372                    'failures' => 0,
373                    'arvados_task' => $first_task,
374                  };
375   push @jobstep_todo, 0;
376 }
377
378
379 if (!$have_slurm)
380 {
381   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 }
383
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
387
388 if (!defined $no_clear_tmp) {
389   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390   Log (undef, "Clean work dirs");
391
392   my $cleanpid = fork();
393   if ($cleanpid == 0)
394   {
395     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396     # Then clean up work directories.
397     # TODO: When #5036 is done and widely deployed, we can limit mount's
398     # -t option to simply fuse.keep.
399     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
401     exit (1);
402   }
403   while (1)
404   {
405     last if $cleanpid == waitpid (-1, WNOHANG);
406     freeze_if_want_freeze ($cleanpid);
407     select (undef, undef, undef, 0.1);
408   }
409   if ($?) {
410     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   Log (undef, "Install docker image $docker_locator");
419   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
420   if (!$docker_hash)
421   {
422     croak("No Docker image hash found from locator $docker_locator");
423   }
424   Log (undef, "docker image hash is $docker_hash");
425   $docker_stream =~ s/^\.//;
426   my $docker_install_script = qq{
427 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
428     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
429 fi
430 };
431   my $docker_pid = fork();
432   if ($docker_pid == 0)
433   {
434     srun (["srun", "--nodelist=" . join(',', @node)],
435           ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]);
436     exit ($?);
437   }
438   while (1)
439   {
440     last if $docker_pid == waitpid (-1, WNOHANG);
441     freeze_if_want_freeze ($docker_pid);
442     select (undef, undef, undef, 0.1);
443   }
444   if ($? != 0)
445   {
446     Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?));
447     exit(EX_RETRY_UNLOCKED);
448   }
449
450   # Determine whether this version of Docker supports memory+swap limits.
451   srun(["srun", "--nodelist=" . $node[0]],
452        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
453       {fork => 1});
454   $docker_limitmem = ($? == 0);
455
456   # Find a non-root Docker user to use.
457   # Tries the default user for the container, then 'crunch', then 'nobody',
458   # testing for whether the actual user id is non-zero.  This defends against
459   # mistakes but not malice, but we intend to harden the security in the future
460   # so we don't want anyone getting used to their jobs running as root in their
461   # Docker containers.
462   my @tryusers = ("", "crunch", "nobody");
463   foreach my $try_user (@tryusers) {
464     my $try_user_arg;
465     if ($try_user eq "") {
466       Log(undef, "Checking if container default user is not UID 0");
467       $try_user_arg = "";
468     } else {
469       Log(undef, "Checking if user '$try_user' is not UID 0");
470       $try_user_arg = "--user=$try_user";
471     }
472     srun(["srun", "--nodelist=" . $node[0]],
473          ["/bin/sh", "-ec",
474           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
475           " test \$a -ne 0"],
476          {fork => 1});
477     if ($? == 0) {
478       $dockeruserarg = $try_user_arg;
479       if ($try_user eq "") {
480         Log(undef, "Container will run with default user");
481       } else {
482         Log(undef, "Container will run with $dockeruserarg");
483       }
484       last;
485     }
486   }
487
488   if (!defined $dockeruserarg) {
489     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
490   }
491
492   if ($Job->{arvados_sdk_version}) {
493     # The job also specifies an Arvados SDK version.  Add the SDKs to the
494     # tar file for the build script to install.
495     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
496                        $Job->{arvados_sdk_version}));
497     add_git_archive("git", "--git-dir=$git_dir", "archive",
498                     "--prefix=.arvados.sdk/",
499                     $Job->{arvados_sdk_version}, "sdk");
500   }
501 }
502
503 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
504   # If script_version looks like an absolute path, *and* the --git-dir
505   # argument was not given -- which implies we were not invoked by
506   # crunch-dispatch -- we will use the given path as a working
507   # directory instead of resolving script_version to a git commit (or
508   # doing anything else with git).
509   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
510   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
511 }
512 else {
513   # Resolve the given script_version to a git commit sha1. Also, if
514   # the repository is remote, clone it into our local filesystem: this
515   # ensures "git archive" will work, and is necessary to reliably
516   # resolve a symbolic script_version like "master^".
517   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
518
519   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
520
521   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
522
523   # If we're running under crunch-dispatch, it will have already
524   # pulled the appropriate source tree into its own repository, and
525   # given us that repo's path as $git_dir.
526   #
527   # If we're running a "local" job, we might have to fetch content
528   # from a remote repository.
529   #
530   # (Currently crunch-dispatch gives a local path with --git-dir, but
531   # we might as well accept URLs there too in case it changes its
532   # mind.)
533   my $repo = $git_dir || $Job->{'repository'};
534
535   # Repository can be remote or local. If remote, we'll need to fetch it
536   # to a local dir before doing `git log` et al.
537   my $repo_location;
538
539   if ($repo =~ m{://|^[^/]*:}) {
540     # $repo is a git url we can clone, like git:// or https:// or
541     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
542     # not recognized here because distinguishing that from a local
543     # path is too fragile. If you really need something strange here,
544     # use the ssh:// form.
545     $repo_location = 'remote';
546   } elsif ($repo =~ m{^\.*/}) {
547     # $repo is a local path to a git index. We'll also resolve ../foo
548     # to ../foo/.git if the latter is a directory. To help
549     # disambiguate local paths from named hosted repositories, this
550     # form must be given as ./ or ../ if it's a relative path.
551     if (-d "$repo/.git") {
552       $repo = "$repo/.git";
553     }
554     $repo_location = 'local';
555   } else {
556     # $repo is none of the above. It must be the name of a hosted
557     # repository.
558     my $arv_repo_list = api_call("repositories/list",
559                                  'filters' => [['name','=',$repo]]);
560     my @repos_found = @{$arv_repo_list->{'items'}};
561     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
562     if ($n_found > 0) {
563       Log(undef, "Repository '$repo' -> "
564           . join(", ", map { $_->{'uuid'} } @repos_found));
565     }
566     if ($n_found != 1) {
567       croak("Error: Found $n_found repositories with name '$repo'.");
568     }
569     $repo = $repos_found[0]->{'fetch_url'};
570     $repo_location = 'remote';
571   }
572   Log(undef, "Using $repo_location repository '$repo'");
573   $ENV{"CRUNCH_SRC_URL"} = $repo;
574
575   # Resolve given script_version (we'll call that $treeish here) to a
576   # commit sha1 ($commit).
577   my $treeish = $Job->{'script_version'};
578   my $commit;
579   if ($repo_location eq 'remote') {
580     # We minimize excess object-fetching by re-using the same bare
581     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
582     # just keep adding remotes to it as needed.
583     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
584     my $gitcmd = "git --git-dir=\Q$local_repo\E";
585
586     # Set up our local repo for caching remote objects, making
587     # archives, etc.
588     if (!-d $local_repo) {
589       make_path($local_repo) or croak("Error: could not create $local_repo");
590     }
591     # This works (exits 0 and doesn't delete fetched objects) even
592     # if $local_repo is already initialized:
593     `$gitcmd init --bare`;
594     if ($?) {
595       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
596     }
597
598     # If $treeish looks like a hash (or abbrev hash) we look it up in
599     # our local cache first, since that's cheaper. (We don't want to
600     # do that with tags/branches though -- those change over time, so
601     # they should always be resolved by the remote repo.)
602     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
603       # Hide stderr because it's normal for this to fail:
604       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
605       if ($? == 0 &&
606           # Careful not to resolve a branch named abcdeff to commit 1234567:
607           $sha1 =~ /^$treeish/ &&
608           $sha1 =~ /^([0-9a-f]{40})$/s) {
609         $commit = $1;
610         Log(undef, "Commit $commit already present in $local_repo");
611       }
612     }
613
614     if (!defined $commit) {
615       # If $treeish isn't just a hash or abbrev hash, or isn't here
616       # yet, we need to fetch the remote to resolve it correctly.
617
618       # First, remove all local heads. This prevents a name that does
619       # not exist on the remote from resolving to (or colliding with)
620       # a previously fetched branch or tag (possibly from a different
621       # remote).
622       remove_tree("$local_repo/refs/heads", {keep_root => 1});
623
624       Log(undef, "Fetching objects from $repo to $local_repo");
625       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
626       if ($?) {
627         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
628       }
629     }
630
631     # Now that the data is all here, we will use our local repo for
632     # the rest of our git activities.
633     $repo = $local_repo;
634   }
635
636   my $gitcmd = "git --git-dir=\Q$repo\E";
637   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
638   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
639     croak("`$gitcmd rev-list` exited "
640           .exit_status_s($?)
641           .", '$treeish' not found, giving up");
642   }
643   $commit = $1;
644   Log(undef, "Version $treeish is commit $commit");
645
646   if ($commit ne $Job->{'script_version'}) {
647     # Record the real commit id in the database, frozentokey, logs,
648     # etc. -- instead of an abbreviation or a branch name which can
649     # become ambiguous or point to a different commit in the future.
650     if (!$Job->update_attributes('script_version' => $commit)) {
651       croak("Error: failed to update job's script_version attribute");
652     }
653   }
654
655   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
656   add_git_archive("$gitcmd archive ''\Q$commit\E");
657 }
658
659 my $git_archive = combined_git_archive();
660 if (!defined $git_archive) {
661   Log(undef, "Skip install phase (no git archive)");
662   if ($have_slurm) {
663     Log(undef, "Warning: This probably means workers have no source tree!");
664   }
665 }
666 else {
667   my $install_exited;
668   my $install_script_tries_left = 3;
669   for (my $attempts = 0; $attempts < 3; $attempts++) {
670     Log(undef, "Run install script on all workers");
671
672     my @srunargs = ("srun",
673                     "--nodelist=$nodelist",
674                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
675     my @execargs = ("sh", "-c",
676                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
677
678     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
679     my ($install_stderr_r, $install_stderr_w);
680     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
681     set_nonblocking($install_stderr_r);
682     my $installpid = fork();
683     if ($installpid == 0)
684     {
685       close($install_stderr_r);
686       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
687       open(STDOUT, ">&", $install_stderr_w);
688       open(STDERR, ">&", $install_stderr_w);
689       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
690       exit (1);
691     }
692     close($install_stderr_w);
693     # Tell freeze_if_want_freeze how to kill the child, otherwise the
694     # "waitpid(installpid)" loop won't get interrupted by a freeze:
695     $proc{$installpid} = {};
696     my $stderr_buf = '';
697     # Track whether anything appears on stderr other than slurm errors
698     # ("srun: ...") and the "starting: ..." message printed by the
699     # srun subroutine itself:
700     my $stderr_anything_from_script = 0;
701     my $match_our_own_errors = '^(srun: error: |starting: \[)';
702     while ($installpid != waitpid(-1, WNOHANG)) {
703       freeze_if_want_freeze ($installpid);
704       # Wait up to 0.1 seconds for something to appear on stderr, then
705       # do a non-blocking read.
706       my $bits = fhbits($install_stderr_r);
707       select ($bits, undef, $bits, 0.1);
708       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
709       {
710         while ($stderr_buf =~ /^(.*?)\n/) {
711           my $line = $1;
712           substr $stderr_buf, 0, 1+length($line), "";
713           Log(undef, "stderr $line");
714           if ($line !~ /$match_our_own_errors/) {
715             $stderr_anything_from_script = 1;
716           }
717         }
718       }
719     }
720     delete $proc{$installpid};
721     $install_exited = $?;
722     close($install_stderr_r);
723     if (length($stderr_buf) > 0) {
724       if ($stderr_buf !~ /$match_our_own_errors/) {
725         $stderr_anything_from_script = 1;
726       }
727       Log(undef, "stderr $stderr_buf")
728     }
729
730     Log (undef, "Install script exited ".exit_status_s($install_exited));
731     last if $install_exited == 0 || $main::please_freeze;
732     # If the install script fails but doesn't print an error message,
733     # the next thing anyone is likely to do is just run it again in
734     # case it was a transient problem like "slurm communication fails
735     # because the network isn't reliable enough". So we'll just do
736     # that ourselves (up to 3 attempts in total). OTOH, if there is an
737     # error message, the problem is more likely to have a real fix and
738     # we should fail the job so the fixing process can start, instead
739     # of doing 2 more attempts.
740     last if $stderr_anything_from_script;
741   }
742
743   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
744     unlink($tar_filename);
745   }
746
747   if ($install_exited != 0) {
748     croak("Giving up");
749   }
750 }
751
752 foreach (qw (script script_version script_parameters runtime_constraints))
753 {
754   Log (undef,
755        "$_ " .
756        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
757 }
758 foreach (split (/\n/, $Job->{knobs}))
759 {
760   Log (undef, "knob " . $_);
761 }
762
763
764
765 $main::success = undef;
766
767
768
769 ONELEVEL:
770
771 my $thisround_succeeded = 0;
772 my $thisround_failed = 0;
773 my $thisround_failed_multiple = 0;
774 my $working_slot_count = scalar(@slot);
775
776 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
777                        or $a <=> $b } @jobstep_todo;
778 my $level = $jobstep[$jobstep_todo[0]]->{level};
779
780 my $initial_tasks_this_level = 0;
781 foreach my $id (@jobstep_todo) {
782   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
783 }
784
785 # If the number of tasks scheduled at this level #T is smaller than the number
786 # of slots available #S, only use the first #T slots, or the first slot on
787 # each node, whichever number is greater.
788 #
789 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
790 # based on these numbers.  Using fewer slots makes more resources available
791 # to each individual task, which should normally be a better strategy when
792 # there are fewer of them running with less parallelism.
793 #
794 # Note that this calculation is not redone if the initial tasks at
795 # this level queue more tasks at the same level.  This may harm
796 # overall task throughput for that level.
797 my @freeslot;
798 if ($initial_tasks_this_level < @node) {
799   @freeslot = (0..$#node);
800 } elsif ($initial_tasks_this_level < @slot) {
801   @freeslot = (0..$initial_tasks_this_level - 1);
802 } else {
803   @freeslot = (0..$#slot);
804 }
805 my $round_num_freeslots = scalar(@freeslot);
806
807 my %round_max_slots = ();
808 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
809   my $this_slot = $slot[$freeslot[$ii]];
810   my $node_name = $this_slot->{node}->{name};
811   $round_max_slots{$node_name} ||= $this_slot->{cpu};
812   last if (scalar(keys(%round_max_slots)) >= @node);
813 }
814
815 Log(undef, "start level $level with $round_num_freeslots slots");
816 my @holdslot;
817 my %reader;
818 my $progress_is_dirty = 1;
819 my $progress_stats_updated = 0;
820
821 update_progress_stats();
822
823
824 THISROUND:
825 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
826 {
827   # Don't create new tasks if we already know the job's final result.
828   last if defined($main::success);
829
830   my $id = $jobstep_todo[$todo_ptr];
831   my $Jobstep = $jobstep[$id];
832   if ($Jobstep->{level} != $level)
833   {
834     next;
835   }
836
837   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
838   set_nonblocking($reader{$id});
839
840   my $childslot = $freeslot[0];
841   my $childnode = $slot[$childslot]->{node};
842   my $childslotname = join (".",
843                             $slot[$childslot]->{node}->{name},
844                             $slot[$childslot]->{cpu});
845
846   my $childpid = fork();
847   if ($childpid == 0)
848   {
849     $SIG{'INT'} = 'DEFAULT';
850     $SIG{'QUIT'} = 'DEFAULT';
851     $SIG{'TERM'} = 'DEFAULT';
852
853     foreach (values (%reader))
854     {
855       close($_);
856     }
857     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
858     open(STDOUT,">&writer");
859     open(STDERR,">&writer");
860
861     undef $dbh;
862     undef $sth;
863
864     delete $ENV{"GNUPGHOME"};
865     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
866     $ENV{"TASK_QSEQUENCE"} = $id;
867     $ENV{"TASK_SEQUENCE"} = $level;
868     $ENV{"JOB_SCRIPT"} = $Job->{script};
869     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
870       $param =~ tr/a-z/A-Z/;
871       $ENV{"JOB_PARAMETER_$param"} = $value;
872     }
873     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
874     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
875     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
876     $ENV{"HOME"} = $ENV{"TASK_WORK"};
877     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
878     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
879     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
880
881     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
882
883     $ENV{"GZIP"} = "-n";
884
885     my @srunargs = (
886       "srun",
887       "--nodelist=".$childnode->{name},
888       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
889       "--job-name=$job_id.$id.$$",
890         );
891
892     my $stdbuf = " stdbuf --output=0 --error=0 ";
893
894     my $arv_file_cache = "";
895     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
896       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
897     }
898
899     my $command =
900         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
901         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
902         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
903         # These environment variables get used explicitly later in
904         # $command.  No tool is expected to read these values directly.
905         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
906         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
907         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
908         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
909
910     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
911     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
912     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
913
914     if ($docker_hash)
915     {
916       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
917       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
918       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
919       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
920       # We only set memory limits if Docker lets us limit both memory and swap.
921       # Memory limits alone have been supported longer, but subprocesses tend
922       # to get SIGKILL if they exceed that without any swap limit set.
923       # See #5642 for additional background.
924       if ($docker_limitmem) {
925         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
926       }
927
928       # The source tree and $destdir directory (which we have
929       # installed on the worker host) are available in the container,
930       # under the same path.
931       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
932       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
933
934       # Currently, we make the "by_pdh" directory in arv-mount's mount
935       # point appear at /keep inside the container (instead of using
936       # the same path as the host like we do with CRUNCH_SRC and
937       # CRUNCH_INSTALL). However, crunch scripts and utilities must
938       # not rely on this. They must use $TASK_KEEPMOUNT.
939       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
940       $ENV{TASK_KEEPMOUNT} = "/keep";
941
942       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
943       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
944       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
945
946       # TASK_WORK is almost exactly like a docker data volume: it
947       # starts out empty, is writable, and persists until no
948       # containers use it any more. We don't use --volumes-from to
949       # share it with other containers: it is only accessible to this
950       # task, and it goes away when this task stops.
951       #
952       # However, a docker data volume is writable only by root unless
953       # the mount point already happens to exist in the container with
954       # different permissions. Therefore, we [1] assume /tmp already
955       # exists in the image and is writable by the crunch user; [2]
956       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
957       # writable if they are created by docker while setting up the
958       # other --volumes); and [3] create $TASK_WORK inside the
959       # container using $build_script.
960       $command .= "--volume=/tmp ";
961       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
962       $ENV{"HOME"} = $ENV{"TASK_WORK"};
963       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
964
965       # TODO: Share a single JOB_WORK volume across all task
966       # containers on a given worker node, and delete it when the job
967       # ends (and, in case that doesn't work, when the next job
968       # starts).
969       #
970       # For now, use the same approach as TASK_WORK above.
971       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
972
973       while (my ($env_key, $env_val) = each %ENV)
974       {
975         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
976           $command .= "--env=\Q$env_key=$env_val\E ";
977         }
978       }
979       $command .= "--env=\QHOME=$ENV{HOME}\E ";
980       $command .= "\Q$docker_hash\E ";
981
982       if ($Job->{arvados_sdk_version}) {
983         $command .= $stdbuf;
984         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
985       } else {
986         $command .= "/bin/sh -c \'python -c " .
987             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
988             ">&2 2>/dev/null; " .
989             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
990             "if which stdbuf >/dev/null ; then " .
991             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
992             " else " .
993             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
994             " fi\'";
995       }
996     } else {
997       # Non-docker run
998       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
999       $command .= $stdbuf;
1000       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
1001     }
1002
1003     my @execargs = ('bash', '-c', $command);
1004     srun (\@srunargs, \@execargs, undef, $build_script);
1005     # exec() failed, we assume nothing happened.
1006     die "srun() failed on build script\n";
1007   }
1008   close("writer");
1009   if (!defined $childpid)
1010   {
1011     close $reader{$id};
1012     delete $reader{$id};
1013     next;
1014   }
1015   shift @freeslot;
1016   $proc{$childpid} = { jobstep => $id,
1017                        time => time,
1018                        slot => $childslot,
1019                        jobstepname => "$job_id.$id.$childpid",
1020                      };
1021   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1022   $slot[$childslot]->{pid} = $childpid;
1023
1024   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1025   Log ($id, "child $childpid started on $childslotname");
1026   $Jobstep->{starttime} = time;
1027   $Jobstep->{node} = $childnode->{name};
1028   $Jobstep->{slotindex} = $childslot;
1029   delete $Jobstep->{stderr};
1030   delete $Jobstep->{finishtime};
1031   delete $Jobstep->{tempfail};
1032
1033   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1034   $Jobstep->{'arvados_task'}->save;
1035
1036   splice @jobstep_todo, $todo_ptr, 1;
1037   --$todo_ptr;
1038
1039   $progress_is_dirty = 1;
1040
1041   while (!@freeslot
1042          ||
1043          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1044   {
1045     last THISROUND if $main::please_freeze;
1046     if ($main::please_info)
1047     {
1048       $main::please_info = 0;
1049       freeze();
1050       create_output_collection();
1051       save_meta(1);
1052       update_progress_stats();
1053     }
1054     my $gotsome
1055         = readfrompipes ()
1056         + reapchildren ();
1057     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1058     {
1059       check_refresh_wanted();
1060       check_squeue();
1061       update_progress_stats();
1062     }
1063     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1064     {
1065       update_progress_stats();
1066     }
1067     if (!$gotsome) {
1068       select (undef, undef, undef, 0.1);
1069     }
1070     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1071                                         $_->{node}->{hold_count} < 4 } @slot);
1072     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1073         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1074     {
1075       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1076           .($thisround_failed+$thisround_succeeded)
1077           .") -- giving up on this round";
1078       Log (undef, $message);
1079       last THISROUND;
1080     }
1081
1082     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1083     for (my $i=$#freeslot; $i>=0; $i--) {
1084       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1085         push @holdslot, (splice @freeslot, $i, 1);
1086       }
1087     }
1088     for (my $i=$#holdslot; $i>=0; $i--) {
1089       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1090         push @freeslot, (splice @holdslot, $i, 1);
1091       }
1092     }
1093
1094     # give up if no nodes are succeeding
1095     if ($working_slot_count < 1) {
1096       Log(undef, "Every node has failed -- giving up");
1097       last THISROUND;
1098     }
1099   }
1100 }
1101
1102
1103 push @freeslot, splice @holdslot;
1104 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1105
1106
1107 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1108 while (%proc)
1109 {
1110   if ($main::please_continue) {
1111     $main::please_continue = 0;
1112     goto THISROUND;
1113   }
1114   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1115   readfrompipes ();
1116   if (!reapchildren())
1117   {
1118     check_refresh_wanted();
1119     check_squeue();
1120     update_progress_stats();
1121     select (undef, undef, undef, 0.1);
1122     killem (keys %proc) if $main::please_freeze;
1123   }
1124 }
1125
1126 update_progress_stats();
1127 freeze_if_want_freeze();
1128
1129
1130 if (!defined $main::success)
1131 {
1132   if (!@jobstep_todo) {
1133     $main::success = 1;
1134   } elsif ($working_slot_count < 1) {
1135     save_output_collection();
1136     save_meta();
1137     exit(EX_RETRY_UNLOCKED);
1138   } elsif ($thisround_succeeded == 0 &&
1139            ($thisround_failed == 0 || $thisround_failed > 4)) {
1140     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1141     Log (undef, $message);
1142     $main::success = 0;
1143   }
1144 }
1145
1146 goto ONELEVEL if !defined $main::success;
1147
1148
1149 release_allocation();
1150 freeze();
1151 my $collated_output = save_output_collection();
1152 Log (undef, "finish");
1153
1154 save_meta();
1155
1156 my $final_state;
1157 if ($collated_output && $main::success) {
1158   $final_state = 'Complete';
1159 } else {
1160   $final_state = 'Failed';
1161 }
1162 $Job->update_attributes('state' => $final_state);
1163
1164 exit (($final_state eq 'Complete') ? 0 : 1);
1165
1166
1167
1168 sub update_progress_stats
1169 {
1170   $progress_stats_updated = time;
1171   return if !$progress_is_dirty;
1172   my ($todo, $done, $running) = (scalar @jobstep_todo,
1173                                  scalar @jobstep_done,
1174                                  scalar keys(%proc));
1175   $Job->{'tasks_summary'} ||= {};
1176   $Job->{'tasks_summary'}->{'todo'} = $todo;
1177   $Job->{'tasks_summary'}->{'done'} = $done;
1178   $Job->{'tasks_summary'}->{'running'} = $running;
1179   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1180   Log (undef, "status: $done done, $running running, $todo todo");
1181   $progress_is_dirty = 0;
1182 }
1183
1184
1185
1186 sub reapchildren
1187 {
1188   my $pid = waitpid (-1, WNOHANG);
1189   return 0 if $pid <= 0;
1190
1191   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1192                   . "."
1193                   . $slot[$proc{$pid}->{slot}]->{cpu});
1194   my $jobstepid = $proc{$pid}->{jobstep};
1195   my $elapsed = time - $proc{$pid}->{time};
1196   my $Jobstep = $jobstep[$jobstepid];
1197
1198   my $childstatus = $?;
1199   my $exitvalue = $childstatus >> 8;
1200   my $exitinfo = "exit ".exit_status_s($childstatus);
1201   $Jobstep->{'arvados_task'}->reload;
1202   my $task_success = $Jobstep->{'arvados_task'}->{success};
1203
1204   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1205
1206   if (!defined $task_success) {
1207     # task did not indicate one way or the other --> fail
1208     Log($jobstepid, sprintf(
1209           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1210           exit_status_s($childstatus)));
1211     $Jobstep->{'arvados_task'}->{success} = 0;
1212     $Jobstep->{'arvados_task'}->save;
1213     $task_success = 0;
1214   }
1215
1216   if (!$task_success)
1217   {
1218     my $temporary_fail;
1219     $temporary_fail ||= $Jobstep->{tempfail};
1220     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1221
1222     ++$thisround_failed;
1223     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1224
1225     # Check for signs of a failed or misconfigured node
1226     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1227         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1228       # Don't count this against jobstep failure thresholds if this
1229       # node is already suspected faulty and srun exited quickly
1230       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1231           $elapsed < 5) {
1232         Log ($jobstepid, "blaming failure on suspect node " .
1233              $slot[$proc{$pid}->{slot}]->{node}->{name});
1234         $temporary_fail ||= 1;
1235       }
1236       ban_node_by_slot($proc{$pid}->{slot});
1237     }
1238
1239     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1240                              ++$Jobstep->{'failures'},
1241                              $temporary_fail ? 'temporary' : 'permanent',
1242                              $elapsed));
1243
1244     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1245       # Give up on this task, and the whole job
1246       $main::success = 0;
1247     }
1248     # Put this task back on the todo queue
1249     push @jobstep_todo, $jobstepid;
1250     $Job->{'tasks_summary'}->{'failed'}++;
1251   }
1252   else
1253   {
1254     ++$thisround_succeeded;
1255     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1256     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1257     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1258     push @jobstep_done, $jobstepid;
1259     Log ($jobstepid, "success in $elapsed seconds");
1260   }
1261   $Jobstep->{exitcode} = $childstatus;
1262   $Jobstep->{finishtime} = time;
1263   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1264   $Jobstep->{'arvados_task'}->save;
1265   process_stderr ($jobstepid, $task_success);
1266   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1267                            length($Jobstep->{'arvados_task'}->{output}),
1268                            $Jobstep->{'arvados_task'}->{output}));
1269
1270   close $reader{$jobstepid};
1271   delete $reader{$jobstepid};
1272   delete $slot[$proc{$pid}->{slot}]->{pid};
1273   push @freeslot, $proc{$pid}->{slot};
1274   delete $proc{$pid};
1275
1276   if ($task_success) {
1277     # Load new tasks
1278     my $newtask_list = [];
1279     my $newtask_results;
1280     do {
1281       $newtask_results = api_call(
1282         "job_tasks/list",
1283         'where' => {
1284           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1285         },
1286         'order' => 'qsequence',
1287         'offset' => scalar(@$newtask_list),
1288       );
1289       push(@$newtask_list, @{$newtask_results->{items}});
1290     } while (@{$newtask_results->{items}});
1291     foreach my $arvados_task (@$newtask_list) {
1292       my $jobstep = {
1293         'level' => $arvados_task->{'sequence'},
1294         'failures' => 0,
1295         'arvados_task' => $arvados_task
1296       };
1297       push @jobstep, $jobstep;
1298       push @jobstep_todo, $#jobstep;
1299     }
1300   }
1301
1302   $progress_is_dirty = 1;
1303   1;
1304 }
1305
1306 sub check_refresh_wanted
1307 {
1308   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1309   if (@stat && $stat[9] > $latest_refresh) {
1310     $latest_refresh = scalar time;
1311     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1312     for my $attr ('cancelled_at',
1313                   'cancelled_by_user_uuid',
1314                   'cancelled_by_client_uuid',
1315                   'state') {
1316       $Job->{$attr} = $Job2->{$attr};
1317     }
1318     if ($Job->{'state'} ne "Running") {
1319       if ($Job->{'state'} eq "Cancelled") {
1320         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1321       } else {
1322         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1323       }
1324       $main::success = 0;
1325       $main::please_freeze = 1;
1326     }
1327   }
1328 }
1329
1330 sub check_squeue
1331 {
1332   my $last_squeue_check = $squeue_checked;
1333
1334   # Do not call `squeue` or check the kill list more than once every
1335   # 15 seconds.
1336   return if $last_squeue_check > time - 15;
1337   $squeue_checked = time;
1338
1339   # Look for children from which we haven't received stderr data since
1340   # the last squeue check. If no such children exist, all procs are
1341   # alive and there's no need to even look at squeue.
1342   #
1343   # As long as the crunchstat poll interval (10s) is shorter than the
1344   # squeue check interval (15s) this should make the squeue check an
1345   # infrequent event.
1346   my $silent_procs = 0;
1347   for my $procinfo (values %proc)
1348   {
1349     my $jobstep = $jobstep[$procinfo->{jobstep}];
1350     if ($jobstep->{stderr_at} < $last_squeue_check)
1351     {
1352       $silent_procs++;
1353     }
1354   }
1355   return if $silent_procs == 0;
1356
1357   # use killem() on procs whose killtime is reached
1358   while (my ($pid, $procinfo) = each %proc)
1359   {
1360     my $jobstep = $jobstep[$procinfo->{jobstep}];
1361     if (exists $procinfo->{killtime}
1362         && $procinfo->{killtime} <= time
1363         && $jobstep->{stderr_at} < $last_squeue_check)
1364     {
1365       my $sincewhen = "";
1366       if ($jobstep->{stderr_at}) {
1367         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1368       }
1369       Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1370       killem ($pid);
1371     }
1372   }
1373
1374   if (!$have_slurm)
1375   {
1376     # here is an opportunity to check for mysterious problems with local procs
1377     return;
1378   }
1379
1380   # Get a list of steps still running.  Note: squeue(1) says --steps
1381   # selects a format (which we override anyway) and allows us to
1382   # specify which steps we're interested in (which we don't).
1383   # Importantly, it also changes the meaning of %j from "job name" to
1384   # "step name" and (although this isn't mentioned explicitly in the
1385   # docs) switches from "one line per job" mode to "one line per step"
1386   # mode. Without it, we'd just get a list of one job, instead of a
1387   # list of N steps.
1388   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1389   if ($? != 0)
1390   {
1391     Log(undef, "warning: squeue exit status $? ($!)");
1392     return;
1393   }
1394   chop @squeue;
1395
1396   # which of my jobsteps are running, according to squeue?
1397   my %ok;
1398   for my $jobstepname (@squeue)
1399   {
1400     $ok{$jobstepname} = 1;
1401   }
1402
1403   # Check for child procs >60s old and not mentioned by squeue.
1404   while (my ($pid, $procinfo) = each %proc)
1405   {
1406     if ($procinfo->{time} < time - 60
1407         && $procinfo->{jobstepname}
1408         && !exists $ok{$procinfo->{jobstepname}}
1409         && !exists $procinfo->{killtime})
1410     {
1411       # According to slurm, this task has ended (successfully or not)
1412       # -- but our srun child hasn't exited. First we must wait (30
1413       # seconds) in case this is just a race between communication
1414       # channels. Then, if our srun child process still hasn't
1415       # terminated, we'll conclude some slurm communication
1416       # error/delay has caused the task to die without notifying srun,
1417       # and we'll kill srun ourselves.
1418       $procinfo->{killtime} = time + 30;
1419       Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1420     }
1421   }
1422 }
1423
1424
1425 sub release_allocation
1426 {
1427   if ($have_slurm)
1428   {
1429     Log (undef, "release job allocation");
1430     system "scancel $ENV{SLURM_JOB_ID}";
1431   }
1432 }
1433
1434
1435 sub readfrompipes
1436 {
1437   my $gotsome = 0;
1438   foreach my $job (keys %reader)
1439   {
1440     my $buf;
1441     if (0 < sysread ($reader{$job}, $buf, 65536))
1442     {
1443       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1444       $jobstep[$job]->{stderr_at} = time;
1445       $jobstep[$job]->{stderr} .= $buf;
1446
1447       # Consume everything up to the last \n
1448       preprocess_stderr ($job);
1449
1450       if (length ($jobstep[$job]->{stderr}) > 16384)
1451       {
1452         # If we get a lot of stderr without a newline, chop off the
1453         # front to avoid letting our buffer grow indefinitely.
1454         substr ($jobstep[$job]->{stderr},
1455                 0, length($jobstep[$job]->{stderr}) - 8192) = "";
1456       }
1457       $gotsome = 1;
1458     }
1459   }
1460   return $gotsome;
1461 }
1462
1463
1464 sub preprocess_stderr
1465 {
1466   my $job = shift;
1467
1468   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1469     my $line = $1;
1470     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1471     Log ($job, "stderr $line");
1472     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1473       # whoa.
1474       $main::please_freeze = 1;
1475     }
1476     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1477       my $job_slot_index = $jobstep[$job]->{slotindex};
1478       $slot[$job_slot_index]->{node}->{fail_count}++;
1479       $jobstep[$job]->{tempfail} = 1;
1480       ban_node_by_slot($job_slot_index);
1481     }
1482     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1483       $jobstep[$job]->{tempfail} = 1;
1484       ban_node_by_slot($jobstep[$job]->{slotindex});
1485     }
1486     elsif ($line =~ /arvados\.errors\.Keep/) {
1487       $jobstep[$job]->{tempfail} = 1;
1488     }
1489   }
1490 }
1491
1492
1493 sub process_stderr
1494 {
1495   my $job = shift;
1496   my $task_success = shift;
1497   preprocess_stderr ($job);
1498
1499   map {
1500     Log ($job, "stderr $_");
1501   } split ("\n", $jobstep[$job]->{stderr});
1502 }
1503
1504 sub fetch_block
1505 {
1506   my $hash = shift;
1507   my $keep;
1508   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1509     Log(undef, "fetch_block run error from arv-get $hash: $!");
1510     return undef;
1511   }
1512   my $output_block = "";
1513   while (1) {
1514     my $buf;
1515     my $bytes = sysread($keep, $buf, 1024 * 1024);
1516     if (!defined $bytes) {
1517       Log(undef, "fetch_block read error from arv-get: $!");
1518       $output_block = undef;
1519       last;
1520     } elsif ($bytes == 0) {
1521       # sysread returns 0 at the end of the pipe.
1522       last;
1523     } else {
1524       # some bytes were read into buf.
1525       $output_block .= $buf;
1526     }
1527   }
1528   close $keep;
1529   if ($?) {
1530     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1531     $output_block = undef;
1532   }
1533   return $output_block;
1534 }
1535
1536 # Create a collection by concatenating the output of all tasks (each
1537 # task's output is either a manifest fragment, a locator for a
1538 # manifest fragment stored in Keep, or nothing at all). Return the
1539 # portable_data_hash of the new collection.
1540 sub create_output_collection
1541 {
1542   Log (undef, "collate");
1543
1544   my ($child_out, $child_in);
1545   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1546 import arvados
1547 import sys
1548 print (arvados.api("v1").collections().
1549        create(body={"manifest_text": sys.stdin.read()}).
1550        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1551 }, retry_count());
1552
1553   my $task_idx = -1;
1554   my $manifest_size = 0;
1555   for (@jobstep)
1556   {
1557     ++$task_idx;
1558     my $output = $_->{'arvados_task'}->{output};
1559     next if (!defined($output));
1560     my $next_write;
1561     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1562       $next_write = fetch_block($output);
1563     } else {
1564       $next_write = $output;
1565     }
1566     if (defined($next_write)) {
1567       if (!defined(syswrite($child_in, $next_write))) {
1568         # There's been an error writing.  Stop the loop.
1569         # We'll log details about the exit code later.
1570         last;
1571       } else {
1572         $manifest_size += length($next_write);
1573       }
1574     } else {
1575       my $uuid = $_->{'arvados_task'}->{'uuid'};
1576       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1577       $main::success = 0;
1578     }
1579   }
1580   close($child_in);
1581   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1582
1583   my $joboutput;
1584   my $s = IO::Select->new($child_out);
1585   if ($s->can_read(120)) {
1586     sysread($child_out, $joboutput, 1024 * 1024);
1587     waitpid($pid, 0);
1588     if ($?) {
1589       Log(undef, "output collection creation exited " . exit_status_s($?));
1590       $joboutput = undef;
1591     } else {
1592       chomp($joboutput);
1593     }
1594   } else {
1595     Log (undef, "timed out while creating output collection");
1596     foreach my $signal (2, 2, 2, 15, 15, 9) {
1597       kill($signal, $pid);
1598       last if waitpid($pid, WNOHANG) == -1;
1599       sleep(1);
1600     }
1601   }
1602   close($child_out);
1603
1604   return $joboutput;
1605 }
1606
1607 # Calls create_output_collection, logs the result, and returns it.
1608 # If that was successful, save that as the output in the job record.
1609 sub save_output_collection {
1610   my $collated_output = create_output_collection();
1611
1612   if (!$collated_output) {
1613     Log(undef, "Failed to write output collection");
1614   }
1615   else {
1616     Log(undef, "job output $collated_output");
1617     $Job->update_attributes('output' => $collated_output);
1618   }
1619   return $collated_output;
1620 }
1621
1622 sub killem
1623 {
1624   foreach (@_)
1625   {
1626     my $sig = 2;                # SIGINT first
1627     if (exists $proc{$_}->{"sent_$sig"} &&
1628         time - $proc{$_}->{"sent_$sig"} > 4)
1629     {
1630       $sig = 15;                # SIGTERM if SIGINT doesn't work
1631     }
1632     if (exists $proc{$_}->{"sent_$sig"} &&
1633         time - $proc{$_}->{"sent_$sig"} > 4)
1634     {
1635       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1636     }
1637     if (!exists $proc{$_}->{"sent_$sig"})
1638     {
1639       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1640       kill $sig, $_;
1641       select (undef, undef, undef, 0.1);
1642       if ($sig == 2)
1643       {
1644         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1645       }
1646       $proc{$_}->{"sent_$sig"} = time;
1647       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1648     }
1649   }
1650 }
1651
1652
1653 sub fhbits
1654 {
1655   my($bits);
1656   for (@_) {
1657     vec($bits,fileno($_),1) = 1;
1658   }
1659   $bits;
1660 }
1661
1662
1663 # Send log output to Keep via arv-put.
1664 #
1665 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1666 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1667 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1668 # $log_pipe_pid is the pid of the arv-put subprocess.
1669 #
1670 # The only functions that should access these variables directly are:
1671 #
1672 # log_writer_start($logfilename)
1673 #     Starts an arv-put pipe, reading data on stdin and writing it to
1674 #     a $logfilename file in an output collection.
1675 #
1676 # log_writer_read_output([$timeout])
1677 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1678 #     Passes $timeout to the select() call, with a default of 0.01.
1679 #     Returns the result of the last read() call on $log_pipe_out, or
1680 #     -1 if read() wasn't called because select() timed out.
1681 #     Only other log_writer_* functions should need to call this.
1682 #
1683 # log_writer_send($txt)
1684 #     Writes $txt to the output log collection.
1685 #
1686 # log_writer_finish()
1687 #     Closes the arv-put pipe and returns the output that it produces.
1688 #
1689 # log_writer_is_active()
1690 #     Returns a true value if there is currently a live arv-put
1691 #     process, false otherwise.
1692 #
1693 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1694     $log_pipe_pid);
1695
1696 sub log_writer_start($)
1697 {
1698   my $logfilename = shift;
1699   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1700                         'arv-put',
1701                         '--stream',
1702                         '--retries', '3',
1703                         '--filename', $logfilename,
1704                         '-');
1705   $log_pipe_out_buf = "";
1706   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1707 }
1708
1709 sub log_writer_read_output {
1710   my $timeout = shift || 0.01;
1711   my $read = -1;
1712   while ($read && $log_pipe_out_select->can_read($timeout)) {
1713     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1714                  length($log_pipe_out_buf));
1715   }
1716   if (!defined($read)) {
1717     Log(undef, "error reading log manifest from arv-put: $!");
1718   }
1719   return $read;
1720 }
1721
1722 sub log_writer_send($)
1723 {
1724   my $txt = shift;
1725   print $log_pipe_in $txt;
1726   log_writer_read_output();
1727 }
1728
1729 sub log_writer_finish()
1730 {
1731   return unless $log_pipe_pid;
1732
1733   close($log_pipe_in);
1734
1735   my $logger_failed = 0;
1736   my $read_result = log_writer_read_output(120);
1737   if ($read_result == -1) {
1738     $logger_failed = -1;
1739     Log (undef, "timed out reading from 'arv-put'");
1740   } elsif ($read_result != 0) {
1741     $logger_failed = -2;
1742     Log(undef, "failed to read arv-put log manifest to EOF");
1743   }
1744
1745   waitpid($log_pipe_pid, 0);
1746   if ($?) {
1747     $logger_failed ||= $?;
1748     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1749   }
1750
1751   close($log_pipe_out);
1752   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1753   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1754       $log_pipe_out_select = undef;
1755
1756   return $arv_put_output;
1757 }
1758
1759 sub log_writer_is_active() {
1760   return $log_pipe_pid;
1761 }
1762
1763 sub Log                         # ($jobstep_id, $logmessage)
1764 {
1765   if ($_[1] =~ /\n/) {
1766     for my $line (split (/\n/, $_[1])) {
1767       Log ($_[0], $line);
1768     }
1769     return;
1770   }
1771   my $fh = select STDERR; $|=1; select $fh;
1772   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1773   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1774   $message .= "\n";
1775   my $datetime;
1776   if (log_writer_is_active() || -t STDERR) {
1777     my @gmtime = gmtime;
1778     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1779                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1780   }
1781   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1782
1783   if (log_writer_is_active()) {
1784     log_writer_send($datetime . " " . $message);
1785   }
1786 }
1787
1788
1789 sub croak
1790 {
1791   my ($package, $file, $line) = caller;
1792   my $message = "@_ at $file line $line\n";
1793   Log (undef, $message);
1794   freeze() if @jobstep_todo;
1795   create_output_collection() if @jobstep_todo;
1796   cleanup();
1797   save_meta();
1798   die;
1799 }
1800
1801
1802 sub cleanup
1803 {
1804   return unless $Job;
1805   if ($Job->{'state'} eq 'Cancelled') {
1806     $Job->update_attributes('finished_at' => scalar gmtime);
1807   } else {
1808     $Job->update_attributes('state' => 'Failed');
1809   }
1810 }
1811
1812
1813 sub save_meta
1814 {
1815   my $justcheckpoint = shift; # false if this will be the last meta saved
1816   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1817   return unless log_writer_is_active();
1818   my $log_manifest = log_writer_finish();
1819   return unless defined($log_manifest);
1820
1821   if ($Job->{log}) {
1822     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1823     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1824   }
1825
1826   my $log_coll = api_call(
1827     "collections/create", ensure_unique_name => 1, collection => {
1828       manifest_text => $log_manifest,
1829       owner_uuid => $Job->{owner_uuid},
1830       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1831     });
1832   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1833   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1834 }
1835
1836
1837 sub freeze_if_want_freeze
1838 {
1839   if ($main::please_freeze)
1840   {
1841     release_allocation();
1842     if (@_)
1843     {
1844       # kill some srun procs before freeze+stop
1845       map { $proc{$_} = {} } @_;
1846       while (%proc)
1847       {
1848         killem (keys %proc);
1849         select (undef, undef, undef, 0.1);
1850         my $died;
1851         while (($died = waitpid (-1, WNOHANG)) > 0)
1852         {
1853           delete $proc{$died};
1854         }
1855       }
1856     }
1857     freeze();
1858     create_output_collection();
1859     cleanup();
1860     save_meta();
1861     exit 1;
1862   }
1863 }
1864
1865
1866 sub freeze
1867 {
1868   Log (undef, "Freeze not implemented");
1869   return;
1870 }
1871
1872
1873 sub thaw
1874 {
1875   croak ("Thaw not implemented");
1876 }
1877
1878
1879 sub freezequote
1880 {
1881   my $s = shift;
1882   $s =~ s/\\/\\\\/g;
1883   $s =~ s/\n/\\n/g;
1884   return $s;
1885 }
1886
1887
1888 sub freezeunquote
1889 {
1890   my $s = shift;
1891   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1892   return $s;
1893 }
1894
1895
1896 sub srun
1897 {
1898   my $srunargs = shift;
1899   my $execargs = shift;
1900   my $opts = shift || {};
1901   my $stdin = shift;
1902   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1903
1904   $Data::Dumper::Terse = 1;
1905   $Data::Dumper::Indent = 0;
1906   my $show_cmd = Dumper($args);
1907   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1908   $show_cmd =~ s/\n/ /g;
1909   if ($opts->{fork}) {
1910     Log(undef, "starting: $show_cmd");
1911   } else {
1912     # This is a child process: parent is in charge of reading our
1913     # stderr and copying it to Log() if needed.
1914     warn "starting: $show_cmd\n";
1915   }
1916
1917   if (defined $stdin) {
1918     my $child = open STDIN, "-|";
1919     defined $child or die "no fork: $!";
1920     if ($child == 0) {
1921       print $stdin or die $!;
1922       close STDOUT or die $!;
1923       exit 0;
1924     }
1925   }
1926
1927   return system (@$args) if $opts->{fork};
1928
1929   exec @$args;
1930   warn "ENV size is ".length(join(" ",%ENV));
1931   die "exec failed: $!: @$args";
1932 }
1933
1934
1935 sub ban_node_by_slot {
1936   # Don't start any new jobsteps on this node for 60 seconds
1937   my $slotid = shift;
1938   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1939   $slot[$slotid]->{node}->{hold_count}++;
1940   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1941 }
1942
1943 sub must_lock_now
1944 {
1945   my ($lockfile, $error_message) = @_;
1946   open L, ">", $lockfile or croak("$lockfile: $!");
1947   if (!flock L, LOCK_EX|LOCK_NB) {
1948     croak("Can't lock $lockfile: $error_message\n");
1949   }
1950 }
1951
1952 sub find_docker_image {
1953   # Given a Keep locator, check to see if it contains a Docker image.
1954   # If so, return its stream name and Docker hash.
1955   # If not, return undef for both values.
1956   my $locator = shift;
1957   my ($streamname, $filename);
1958   my $image = api_call("collections/get", uuid => $locator);
1959   if ($image) {
1960     foreach my $line (split(/\n/, $image->{manifest_text})) {
1961       my @tokens = split(/\s+/, $line);
1962       next if (!@tokens);
1963       $streamname = shift(@tokens);
1964       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1965         if (defined($filename)) {
1966           return (undef, undef);  # More than one file in the Collection.
1967         } else {
1968           $filename = (split(/:/, $filedata, 3))[2];
1969         }
1970       }
1971     }
1972   }
1973   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1974     return ($streamname, $1);
1975   } else {
1976     return (undef, undef);
1977   }
1978 }
1979
1980 sub retry_count {
1981   # Calculate the number of times an operation should be retried,
1982   # assuming exponential backoff, and that we're willing to retry as
1983   # long as tasks have been running.  Enforce a minimum of 3 retries.
1984   my ($starttime, $endtime, $timediff, $retries);
1985   if (@jobstep) {
1986     $starttime = $jobstep[0]->{starttime};
1987     $endtime = $jobstep[-1]->{finishtime};
1988   }
1989   if (!defined($starttime)) {
1990     $timediff = 0;
1991   } elsif (!defined($endtime)) {
1992     $timediff = time - $starttime;
1993   } else {
1994     $timediff = ($endtime - $starttime) - (time - $endtime);
1995   }
1996   if ($timediff > 0) {
1997     $retries = int(log($timediff) / log(2));
1998   } else {
1999     $retries = 1;  # Use the minimum.
2000   }
2001   return ($retries > 3) ? $retries : 3;
2002 }
2003
2004 sub retry_op {
2005   # Pass in two function references.
2006   # This method will be called with the remaining arguments.
2007   # If it dies, retry it with exponential backoff until it succeeds,
2008   # or until the current retry_count is exhausted.  After each failure
2009   # that can be retried, the second function will be called with
2010   # the current try count (0-based), next try time, and error message.
2011   my $operation = shift;
2012   my $retry_callback = shift;
2013   my $retries = retry_count();
2014   foreach my $try_count (0..$retries) {
2015     my $next_try = time + (2 ** $try_count);
2016     my $result = eval { $operation->(@_); };
2017     if (!$@) {
2018       return $result;
2019     } elsif ($try_count < $retries) {
2020       $retry_callback->($try_count, $next_try, $@);
2021       my $sleep_time = $next_try - time;
2022       sleep($sleep_time) if ($sleep_time > 0);
2023     }
2024   }
2025   # Ensure the error message ends in a newline, so Perl doesn't add
2026   # retry_op's line number to it.
2027   chomp($@);
2028   die($@ . "\n");
2029 }
2030
2031 sub api_call {
2032   # Pass in a /-separated API method name, and arguments for it.
2033   # This function will call that method, retrying as needed until
2034   # the current retry_count is exhausted, with a log on the first failure.
2035   my $method_name = shift;
2036   my $log_api_retry = sub {
2037     my ($try_count, $next_try_at, $errmsg) = @_;
2038     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2039     $errmsg =~ s/\s/ /g;
2040     $errmsg =~ s/\s+$//;
2041     my $retry_msg;
2042     if ($next_try_at < time) {
2043       $retry_msg = "Retrying.";
2044     } else {
2045       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2046       $retry_msg = "Retrying at $next_try_fmt.";
2047     }
2048     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2049   };
2050   my $method = $arv;
2051   foreach my $key (split(/\//, $method_name)) {
2052     $method = $method->{$key};
2053   }
2054   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2055 }
2056
2057 sub exit_status_s {
2058   # Given a $?, return a human-readable exit code string like "0" or
2059   # "1" or "0 with signal 1" or "1 with signal 11".
2060   my $exitcode = shift;
2061   my $s = $exitcode >> 8;
2062   if ($exitcode & 0x7f) {
2063     $s .= " with signal " . ($exitcode & 0x7f);
2064   }
2065   if ($exitcode & 0x80) {
2066     $s .= " with core dump";
2067   }
2068   return $s;
2069 }
2070
2071 sub handle_readall {
2072   # Pass in a glob reference to a file handle.
2073   # Read all its contents and return them as a string.
2074   my $fh_glob_ref = shift;
2075   local $/ = undef;
2076   return <$fh_glob_ref>;
2077 }
2078
2079 sub tar_filename_n {
2080   my $n = shift;
2081   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2082 }
2083
2084 sub add_git_archive {
2085   # Pass in a git archive command as a string or list, a la system().
2086   # This method will save its output to be included in the archive sent to the
2087   # build script.
2088   my $git_input;
2089   $git_tar_count++;
2090   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2091     croak("Failed to save git archive: $!");
2092   }
2093   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2094   close($git_input);
2095   waitpid($git_pid, 0);
2096   close(GIT_ARCHIVE);
2097   if ($?) {
2098     croak("Failed to save git archive: git exited " . exit_status_s($?));
2099   }
2100 }
2101
2102 sub combined_git_archive {
2103   # Combine all saved tar archives into a single archive, then return its
2104   # contents in a string.  Return undef if no archives have been saved.
2105   if ($git_tar_count < 1) {
2106     return undef;
2107   }
2108   my $base_tar_name = tar_filename_n(1);
2109   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2110     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2111     if ($tar_exit != 0) {
2112       croak("Error preparing build archive: tar -A exited " .
2113             exit_status_s($tar_exit));
2114     }
2115   }
2116   if (!open(GIT_TAR, "<", $base_tar_name)) {
2117     croak("Could not open build archive: $!");
2118   }
2119   my $tar_contents = handle_readall(\*GIT_TAR);
2120   close(GIT_TAR);
2121   return $tar_contents;
2122 }
2123
2124 sub set_nonblocking {
2125   my $fh = shift;
2126   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2127   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2128 }
2129
2130 __DATA__
2131 #!/usr/bin/env perl
2132 #
2133 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2134 # server invokes this script on individual compute nodes, or localhost if we're
2135 # running a job locally.  It gets called in two modes:
2136 #
2137 # * No arguments: Installation mode.  Read a tar archive from the DATA
2138 #   file handle; it includes the Crunch script's source code, and
2139 #   maybe SDKs as well.  Those should be installed in the proper
2140 #   locations.  This runs outside of any Docker container, so don't try to
2141 #   introspect Crunch's runtime environment.
2142 #
2143 # * With arguments: Crunch script run mode.  This script should set up the
2144 #   environment, then run the command specified in the arguments.  This runs
2145 #   inside any Docker container.
2146
2147 use Fcntl ':flock';
2148 use File::Path qw( make_path remove_tree );
2149 use POSIX qw(getcwd);
2150
2151 use constant TASK_TEMPFAIL => 111;
2152
2153 # Map SDK subdirectories to the path environments they belong to.
2154 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2155
2156 my $destdir = $ENV{"CRUNCH_SRC"};
2157 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2158 my $repo = $ENV{"CRUNCH_SRC_URL"};
2159 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2160 my $job_work = $ENV{"JOB_WORK"};
2161 my $task_work = $ENV{"TASK_WORK"};
2162
2163 open(STDOUT_ORIG, ">&", STDOUT);
2164 open(STDERR_ORIG, ">&", STDERR);
2165
2166 for my $dir ($destdir, $job_work, $task_work) {
2167   if ($dir) {
2168     make_path $dir;
2169     -e $dir or die "Failed to create temporary directory ($dir): $!";
2170   }
2171 }
2172
2173 if ($task_work) {
2174   remove_tree($task_work, {keep_root => 1});
2175 }
2176
2177 ### Crunch script run mode
2178 if (@ARGV) {
2179   # We want to do routine logging during task 0 only.  This gives the user
2180   # the information they need, but avoids repeating the information for every
2181   # task.
2182   my $Log;
2183   if ($ENV{TASK_SEQUENCE} eq "0") {
2184     $Log = sub {
2185       my $msg = shift;
2186       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2187     };
2188   } else {
2189     $Log = sub { };
2190   }
2191
2192   my $python_src = "$install_dir/python";
2193   my $venv_dir = "$job_work/.arvados.venv";
2194   my $venv_built = -e "$venv_dir/bin/activate";
2195   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2196     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2197                  "--python=python2.7", $venv_dir);
2198     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2199     $venv_built = 1;
2200     $Log->("Built Python SDK virtualenv");
2201   }
2202
2203   my @pysdk_version_cmd = ("python", "-c",
2204     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2205   if ($venv_built) {
2206     $Log->("Running in Python SDK virtualenv");
2207     @pysdk_version_cmd = ();
2208     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2209     @ARGV = ("/bin/sh", "-ec",
2210              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2211   } elsif (-d $python_src) {
2212     $Log->("Warning: virtualenv not found inside Docker container default " .
2213            "\$PATH. Can't install Python SDK.");
2214   }
2215
2216   if (@pysdk_version_cmd) {
2217     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2218     my $pysdk_version = <$pysdk_version_pipe>;
2219     close($pysdk_version_pipe);
2220     if ($? == 0) {
2221       chomp($pysdk_version);
2222       $Log->("Using Arvados SDK version $pysdk_version");
2223     } else {
2224       # A lot could've gone wrong here, but pretty much all of it means that
2225       # Python won't be able to load the Arvados SDK.
2226       $Log->("Warning: Arvados SDK not found");
2227     }
2228   }
2229
2230   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2231     my $sdk_path = "$install_dir/$sdk_dir";
2232     if (-d $sdk_path) {
2233       if ($ENV{$sdk_envkey}) {
2234         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2235       } else {
2236         $ENV{$sdk_envkey} = $sdk_path;
2237       }
2238       $Log->("Arvados SDK added to %s", $sdk_envkey);
2239     }
2240   }
2241
2242   exec(@ARGV);
2243   die "Cannot exec `@ARGV`: $!";
2244 }
2245
2246 ### Installation mode
2247 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2248 flock L, LOCK_EX;
2249 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2250   # This exact git archive (source + arvados sdk) is already installed
2251   # here, so there's no need to reinstall it.
2252
2253   # We must consume our DATA section, though: otherwise the process
2254   # feeding it to us will get SIGPIPE.
2255   my $buf;
2256   while (read(DATA, $buf, 65536)) { }
2257
2258   exit(0);
2259 }
2260
2261 unlink "$destdir.archive_hash";
2262 mkdir $destdir;
2263
2264 do {
2265   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2266   local $SIG{PIPE} = "IGNORE";
2267   warn "Extracting archive: $archive_hash\n";
2268   # --ignore-zeros is necessary sometimes: depending on how much NUL
2269   # padding tar -A put on our combined archive (which in turn depends
2270   # on the length of the component archives) tar without
2271   # --ignore-zeros will exit before consuming stdin and cause close()
2272   # to fail on the resulting SIGPIPE.
2273   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2274     die "Error launching 'tar -xC $destdir': $!";
2275   }
2276   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2277   # get SIGPIPE.  We must feed it data incrementally.
2278   my $tar_input;
2279   while (read(DATA, $tar_input, 65536)) {
2280     print TARX $tar_input;
2281   }
2282   if(!close(TARX)) {
2283     die "'tar -xC $destdir' exited $?: $!";
2284   }
2285 };
2286
2287 mkdir $install_dir;
2288
2289 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2290 if (-d $sdk_root) {
2291   foreach my $sdk_lang (("python",
2292                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2293     if (-d "$sdk_root/$sdk_lang") {
2294       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2295         die "Failed to install $sdk_lang SDK: $!";
2296       }
2297     }
2298   }
2299 }
2300
2301 my $python_dir = "$install_dir/python";
2302 if ((-d $python_dir) and can_run("python2.7")) {
2303   open(my $egg_info_pipe, "-|",
2304        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2305   my @egg_info_errors = <$egg_info_pipe>;
2306   close($egg_info_pipe);
2307
2308   if ($?) {
2309     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2310       # egg_info apparently failed because it couldn't ask git for a build tag.
2311       # Specify no build tag.
2312       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2313       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2314       close($pysdk_cfg);
2315     } else {
2316       my $egg_info_exit = $? >> 8;
2317       foreach my $errline (@egg_info_errors) {
2318         warn $errline;
2319       }
2320       warn "python setup.py egg_info failed: exit $egg_info_exit";
2321       exit ($egg_info_exit || 1);
2322     }
2323   }
2324 }
2325
2326 # Hide messages from the install script (unless it fails: shell_or_die
2327 # will show $destdir.log in that case).
2328 open(STDOUT, ">>", "$destdir.log");
2329 open(STDERR, ">&", STDOUT);
2330
2331 if (-e "$destdir/crunch_scripts/install") {
2332     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2333 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2334     # Old version
2335     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2336 } elsif (-e "./install.sh") {
2337     shell_or_die (undef, "./install.sh", $install_dir);
2338 }
2339
2340 if ($archive_hash) {
2341     unlink "$destdir.archive_hash.new";
2342     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2343     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2344 }
2345
2346 close L;
2347
2348 sub can_run {
2349   my $command_name = shift;
2350   open(my $which, "-|", "which", $command_name);
2351   while (<$which>) { }
2352   close($which);
2353   return ($? == 0);
2354 }
2355
2356 sub shell_or_die
2357 {
2358   my $exitcode = shift;
2359
2360   if ($ENV{"DEBUG"}) {
2361     print STDERR "@_\n";
2362   }
2363   if (system (@_) != 0) {
2364     my $err = $!;
2365     my $code = $?;
2366     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2367     open STDERR, ">&STDERR_ORIG";
2368     system ("cat $destdir.log >&2");
2369     warn "@_ failed ($err): $exitstatus";
2370     if (defined($exitcode)) {
2371       exit $exitcode;
2372     }
2373     else {
2374       exit (($code >> 8) || 1);
2375     }
2376   }
2377 }
2378
2379 __DATA__