Fixes reference to $js->{killtime} instead of $procinfo->{killtime}
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
187      $cmd,
188      {fork => 1});
189 if ($? != 0) {
190   Log(undef, "Sanity check failed: ".exit_status_s($?));
191   exit EX_TEMPFAIL;
192 }
193 Log(undef, "Sanity check OK");
194
195
196 my $User = api_call("users/current");
197
198 if (!$local_job) {
199   if (!$force_unlock) {
200     # Claim this job, and make sure nobody else does
201     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
202     if ($@) {
203       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
204       exit EX_TEMPFAIL;
205     };
206   }
207 }
208 else
209 {
210   if (!$resume_stash)
211   {
212     map { croak ("No $_ specified") unless $local_job->{$_} }
213     qw(script script_version script_parameters);
214   }
215
216   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217   $local_job->{'started_at'} = gmtime;
218   $local_job->{'state'} = 'Running';
219
220   $Job = api_call("jobs/create", job => $local_job);
221 }
222 $job_id = $Job->{'uuid'};
223
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
226
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
230
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
232 if ($? == 0) {
233   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234   chomp($gem_versions);
235   chop($gem_versions);  # Closing parentheses
236 } else {
237   $gem_versions = "";
238 }
239 Log(undef,
240     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
241
242 Log (undef, "check slurm allocation");
243 my @slot;
244 my @node;
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
246 my @sinfo;
247 if (!$have_slurm)
248 {
249   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250   push @sinfo, "$localcpus localhost";
251 }
252 if (exists $ENV{SLURM_NODELIST})
253 {
254   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
255 }
256 foreach (@sinfo)
257 {
258   my ($ncpus, $slurm_nodelist) = split;
259   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260
261   my @nodelist;
262   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263   {
264     my $nodelist = $1;
265     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266     {
267       my $ranges = $1;
268       foreach (split (",", $ranges))
269       {
270         my ($a, $b);
271         if (/(\d+)-(\d+)/)
272         {
273           $a = $1;
274           $b = $2;
275         }
276         else
277         {
278           $a = $_;
279           $b = $_;
280         }
281         push @nodelist, map {
282           my $n = $nodelist;
283           $n =~ s/\[[-,\d]+\]/$_/;
284           $n;
285         } ($a..$b);
286       }
287     }
288     else
289     {
290       push @nodelist, $nodelist;
291     }
292   }
293   foreach my $nodename (@nodelist)
294   {
295     Log (undef, "node $nodename - $ncpus slots");
296     my $node = { name => $nodename,
297                  ncpus => $ncpus,
298                  # The number of consecutive times a task has been dispatched
299                  # to this node and failed.
300                  losing_streak => 0,
301                  # The number of consecutive times that SLURM has reported
302                  # a node failure since the last successful task.
303                  fail_count => 0,
304                  # Don't dispatch work to this node until this time
305                  # (in seconds since the epoch) has passed.
306                  hold_until => 0 };
307     foreach my $cpu (1..$ncpus)
308     {
309       push @slot, { node => $node,
310                     cpu => $cpu };
311     }
312   }
313   push @node, @nodelist;
314 }
315
316
317
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
320
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322
323
324 $Job->update_attributes(
325   'tasks_summary' => { 'failed' => 0,
326                        'todo' => 1,
327                        'running' => 0,
328                        'done' => 0 });
329
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
338
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
344
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
348
349
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
356
357
358
359 if (defined $Job->{thawedfromkey})
360 {
361   thaw ($Job->{thawedfromkey});
362 }
363 else
364 {
365   my $first_task = api_call("job_tasks/create", job_task => {
366     'job_uuid' => $Job->{'uuid'},
367     'sequence' => 0,
368     'qsequence' => 0,
369     'parameters' => {},
370   });
371   push @jobstep, { 'level' => 0,
372                    'failures' => 0,
373                    'arvados_task' => $first_task,
374                  };
375   push @jobstep_todo, 0;
376 }
377
378
379 if (!$have_slurm)
380 {
381   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 }
383
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
387
388 if (!defined $no_clear_tmp) {
389   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390   Log (undef, "Clean work dirs");
391
392   my $cleanpid = fork();
393   if ($cleanpid == 0)
394   {
395     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396     # Then clean up work directories.
397     # TODO: When #5036 is done and widely deployed, we can limit mount's
398     # -t option to simply fuse.keep.
399     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
401     exit (1);
402   }
403   while (1)
404   {
405     last if $cleanpid == waitpid (-1, WNOHANG);
406     freeze_if_want_freeze ($cleanpid);
407     select (undef, undef, undef, 0.1);
408   }
409   if ($?) {
410     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419   if (!$docker_hash)
420   {
421     croak("No Docker image hash found from locator $docker_locator");
422   }
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
426     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
427 fi
428 };
429   my $docker_pid = fork();
430   if ($docker_pid == 0)
431   {
432     srun (["srun", "--nodelist=" . join(',', @node)],
433           ["/bin/sh", "-ec", $docker_install_script]);
434     exit ($?);
435   }
436   while (1)
437   {
438     last if $docker_pid == waitpid (-1, WNOHANG);
439     freeze_if_want_freeze ($docker_pid);
440     select (undef, undef, undef, 0.1);
441   }
442   if ($? != 0)
443   {
444     croak("Installing Docker image from $docker_locator exited "
445           .exit_status_s($?));
446   }
447
448   # Determine whether this version of Docker supports memory+swap limits.
449   srun(["srun", "--nodelist=" . $node[0]],
450        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
451       {fork => 1});
452   $docker_limitmem = ($? == 0);
453
454   # Find a non-root Docker user to use.
455   # Tries the default user for the container, then 'crunch', then 'nobody',
456   # testing for whether the actual user id is non-zero.  This defends against
457   # mistakes but not malice, but we intend to harden the security in the future
458   # so we don't want anyone getting used to their jobs running as root in their
459   # Docker containers.
460   my @tryusers = ("", "crunch", "nobody");
461   foreach my $try_user (@tryusers) {
462     my $try_user_arg;
463     if ($try_user eq "") {
464       Log(undef, "Checking if container default user is not UID 0");
465       $try_user_arg = "";
466     } else {
467       Log(undef, "Checking if user '$try_user' is not UID 0");
468       $try_user_arg = "--user=$try_user";
469     }
470     srun(["srun", "--nodelist=" . $node[0]],
471          ["/bin/sh", "-ec",
472           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
473           " test \$a -ne 0"],
474          {fork => 1});
475     if ($? == 0) {
476       $dockeruserarg = $try_user_arg;
477       if ($try_user eq "") {
478         Log(undef, "Container will run with default user");
479       } else {
480         Log(undef, "Container will run with $dockeruserarg");
481       }
482       last;
483     }
484   }
485
486   if (!defined $dockeruserarg) {
487     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
488   }
489
490   if ($Job->{arvados_sdk_version}) {
491     # The job also specifies an Arvados SDK version.  Add the SDKs to the
492     # tar file for the build script to install.
493     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
494                        $Job->{arvados_sdk_version}));
495     add_git_archive("git", "--git-dir=$git_dir", "archive",
496                     "--prefix=.arvados.sdk/",
497                     $Job->{arvados_sdk_version}, "sdk");
498   }
499 }
500
501 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
502   # If script_version looks like an absolute path, *and* the --git-dir
503   # argument was not given -- which implies we were not invoked by
504   # crunch-dispatch -- we will use the given path as a working
505   # directory instead of resolving script_version to a git commit (or
506   # doing anything else with git).
507   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
508   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
509 }
510 else {
511   # Resolve the given script_version to a git commit sha1. Also, if
512   # the repository is remote, clone it into our local filesystem: this
513   # ensures "git archive" will work, and is necessary to reliably
514   # resolve a symbolic script_version like "master^".
515   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
516
517   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
520
521   # If we're running under crunch-dispatch, it will have already
522   # pulled the appropriate source tree into its own repository, and
523   # given us that repo's path as $git_dir.
524   #
525   # If we're running a "local" job, we might have to fetch content
526   # from a remote repository.
527   #
528   # (Currently crunch-dispatch gives a local path with --git-dir, but
529   # we might as well accept URLs there too in case it changes its
530   # mind.)
531   my $repo = $git_dir || $Job->{'repository'};
532
533   # Repository can be remote or local. If remote, we'll need to fetch it
534   # to a local dir before doing `git log` et al.
535   my $repo_location;
536
537   if ($repo =~ m{://|^[^/]*:}) {
538     # $repo is a git url we can clone, like git:// or https:// or
539     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
540     # not recognized here because distinguishing that from a local
541     # path is too fragile. If you really need something strange here,
542     # use the ssh:// form.
543     $repo_location = 'remote';
544   } elsif ($repo =~ m{^\.*/}) {
545     # $repo is a local path to a git index. We'll also resolve ../foo
546     # to ../foo/.git if the latter is a directory. To help
547     # disambiguate local paths from named hosted repositories, this
548     # form must be given as ./ or ../ if it's a relative path.
549     if (-d "$repo/.git") {
550       $repo = "$repo/.git";
551     }
552     $repo_location = 'local';
553   } else {
554     # $repo is none of the above. It must be the name of a hosted
555     # repository.
556     my $arv_repo_list = api_call("repositories/list",
557                                  'filters' => [['name','=',$repo]]);
558     my @repos_found = @{$arv_repo_list->{'items'}};
559     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
560     if ($n_found > 0) {
561       Log(undef, "Repository '$repo' -> "
562           . join(", ", map { $_->{'uuid'} } @repos_found));
563     }
564     if ($n_found != 1) {
565       croak("Error: Found $n_found repositories with name '$repo'.");
566     }
567     $repo = $repos_found[0]->{'fetch_url'};
568     $repo_location = 'remote';
569   }
570   Log(undef, "Using $repo_location repository '$repo'");
571   $ENV{"CRUNCH_SRC_URL"} = $repo;
572
573   # Resolve given script_version (we'll call that $treeish here) to a
574   # commit sha1 ($commit).
575   my $treeish = $Job->{'script_version'};
576   my $commit;
577   if ($repo_location eq 'remote') {
578     # We minimize excess object-fetching by re-using the same bare
579     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
580     # just keep adding remotes to it as needed.
581     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
582     my $gitcmd = "git --git-dir=\Q$local_repo\E";
583
584     # Set up our local repo for caching remote objects, making
585     # archives, etc.
586     if (!-d $local_repo) {
587       make_path($local_repo) or croak("Error: could not create $local_repo");
588     }
589     # This works (exits 0 and doesn't delete fetched objects) even
590     # if $local_repo is already initialized:
591     `$gitcmd init --bare`;
592     if ($?) {
593       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
594     }
595
596     # If $treeish looks like a hash (or abbrev hash) we look it up in
597     # our local cache first, since that's cheaper. (We don't want to
598     # do that with tags/branches though -- those change over time, so
599     # they should always be resolved by the remote repo.)
600     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
601       # Hide stderr because it's normal for this to fail:
602       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
603       if ($? == 0 &&
604           # Careful not to resolve a branch named abcdeff to commit 1234567:
605           $sha1 =~ /^$treeish/ &&
606           $sha1 =~ /^([0-9a-f]{40})$/s) {
607         $commit = $1;
608         Log(undef, "Commit $commit already present in $local_repo");
609       }
610     }
611
612     if (!defined $commit) {
613       # If $treeish isn't just a hash or abbrev hash, or isn't here
614       # yet, we need to fetch the remote to resolve it correctly.
615
616       # First, remove all local heads. This prevents a name that does
617       # not exist on the remote from resolving to (or colliding with)
618       # a previously fetched branch or tag (possibly from a different
619       # remote).
620       remove_tree("$local_repo/refs/heads", {keep_root => 1});
621
622       Log(undef, "Fetching objects from $repo to $local_repo");
623       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
624       if ($?) {
625         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626       }
627     }
628
629     # Now that the data is all here, we will use our local repo for
630     # the rest of our git activities.
631     $repo = $local_repo;
632   }
633
634   my $gitcmd = "git --git-dir=\Q$repo\E";
635   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
636   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
637     croak("`$gitcmd rev-list` exited "
638           .exit_status_s($?)
639           .", '$treeish' not found, giving up");
640   }
641   $commit = $1;
642   Log(undef, "Version $treeish is commit $commit");
643
644   if ($commit ne $Job->{'script_version'}) {
645     # Record the real commit id in the database, frozentokey, logs,
646     # etc. -- instead of an abbreviation or a branch name which can
647     # become ambiguous or point to a different commit in the future.
648     if (!$Job->update_attributes('script_version' => $commit)) {
649       croak("Error: failed to update job's script_version attribute");
650     }
651   }
652
653   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
654   add_git_archive("$gitcmd archive ''\Q$commit\E");
655 }
656
657 my $git_archive = combined_git_archive();
658 if (!defined $git_archive) {
659   Log(undef, "Skip install phase (no git archive)");
660   if ($have_slurm) {
661     Log(undef, "Warning: This probably means workers have no source tree!");
662   }
663 }
664 else {
665   my $install_exited;
666   my $install_script_tries_left = 3;
667   for (my $attempts = 0; $attempts < 3; $attempts++) {
668     Log(undef, "Run install script on all workers");
669
670     my @srunargs = ("srun",
671                     "--nodelist=$nodelist",
672                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
673     my @execargs = ("sh", "-c",
674                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
675
676     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
677     my ($install_stderr_r, $install_stderr_w);
678     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
679     set_nonblocking($install_stderr_r);
680     my $installpid = fork();
681     if ($installpid == 0)
682     {
683       close($install_stderr_r);
684       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
685       open(STDOUT, ">&", $install_stderr_w);
686       open(STDERR, ">&", $install_stderr_w);
687       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
688       exit (1);
689     }
690     close($install_stderr_w);
691     # Tell freeze_if_want_freeze how to kill the child, otherwise the
692     # "waitpid(installpid)" loop won't get interrupted by a freeze:
693     $proc{$installpid} = {};
694     my $stderr_buf = '';
695     # Track whether anything appears on stderr other than slurm errors
696     # ("srun: ...") and the "starting: ..." message printed by the
697     # srun subroutine itself:
698     my $stderr_anything_from_script = 0;
699     my $match_our_own_errors = '^(srun: error: |starting: \[)';
700     while ($installpid != waitpid(-1, WNOHANG)) {
701       freeze_if_want_freeze ($installpid);
702       # Wait up to 0.1 seconds for something to appear on stderr, then
703       # do a non-blocking read.
704       my $bits = fhbits($install_stderr_r);
705       select ($bits, undef, $bits, 0.1);
706       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
707       {
708         while ($stderr_buf =~ /^(.*?)\n/) {
709           my $line = $1;
710           substr $stderr_buf, 0, 1+length($line), "";
711           Log(undef, "stderr $line");
712           if ($line !~ /$match_our_own_errors/) {
713             $stderr_anything_from_script = 1;
714           }
715         }
716       }
717     }
718     delete $proc{$installpid};
719     $install_exited = $?;
720     close($install_stderr_r);
721     if (length($stderr_buf) > 0) {
722       if ($stderr_buf !~ /$match_our_own_errors/) {
723         $stderr_anything_from_script = 1;
724       }
725       Log(undef, "stderr $stderr_buf")
726     }
727
728     Log (undef, "Install script exited ".exit_status_s($install_exited));
729     last if $install_exited == 0 || $main::please_freeze;
730     # If the install script fails but doesn't print an error message,
731     # the next thing anyone is likely to do is just run it again in
732     # case it was a transient problem like "slurm communication fails
733     # because the network isn't reliable enough". So we'll just do
734     # that ourselves (up to 3 attempts in total). OTOH, if there is an
735     # error message, the problem is more likely to have a real fix and
736     # we should fail the job so the fixing process can start, instead
737     # of doing 2 more attempts.
738     last if $stderr_anything_from_script;
739   }
740
741   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
742     unlink($tar_filename);
743   }
744
745   if ($install_exited != 0) {
746     croak("Giving up");
747   }
748 }
749
750 foreach (qw (script script_version script_parameters runtime_constraints))
751 {
752   Log (undef,
753        "$_ " .
754        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
755 }
756 foreach (split (/\n/, $Job->{knobs}))
757 {
758   Log (undef, "knob " . $_);
759 }
760
761
762
763 $main::success = undef;
764
765
766
767 ONELEVEL:
768
769 my $thisround_succeeded = 0;
770 my $thisround_failed = 0;
771 my $thisround_failed_multiple = 0;
772 my $working_slot_count = scalar(@slot);
773
774 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
775                        or $a <=> $b } @jobstep_todo;
776 my $level = $jobstep[$jobstep_todo[0]]->{level};
777
778 my $initial_tasks_this_level = 0;
779 foreach my $id (@jobstep_todo) {
780   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
781 }
782
783 # If the number of tasks scheduled at this level #T is smaller than the number
784 # of slots available #S, only use the first #T slots, or the first slot on
785 # each node, whichever number is greater.
786 #
787 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
788 # based on these numbers.  Using fewer slots makes more resources available
789 # to each individual task, which should normally be a better strategy when
790 # there are fewer of them running with less parallelism.
791 #
792 # Note that this calculation is not redone if the initial tasks at
793 # this level queue more tasks at the same level.  This may harm
794 # overall task throughput for that level.
795 my @freeslot;
796 if ($initial_tasks_this_level < @node) {
797   @freeslot = (0..$#node);
798 } elsif ($initial_tasks_this_level < @slot) {
799   @freeslot = (0..$initial_tasks_this_level - 1);
800 } else {
801   @freeslot = (0..$#slot);
802 }
803 my $round_num_freeslots = scalar(@freeslot);
804 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
805
806 my %round_max_slots = ();
807 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
808   my $this_slot = $slot[$freeslot[$ii]];
809   my $node_name = $this_slot->{node}->{name};
810   $round_max_slots{$node_name} ||= $this_slot->{cpu};
811   last if (scalar(keys(%round_max_slots)) >= @node);
812 }
813
814 Log(undef, "start level $level with $round_num_freeslots slots");
815 my @holdslot;
816 my %reader;
817 my $progress_is_dirty = 1;
818 my $progress_stats_updated = 0;
819
820 update_progress_stats();
821
822
823 THISROUND:
824 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
825 {
826   # Don't create new tasks if we already know the job's final result.
827   last if defined($main::success);
828
829   my $id = $jobstep_todo[$todo_ptr];
830   my $Jobstep = $jobstep[$id];
831   if ($Jobstep->{level} != $level)
832   {
833     next;
834   }
835
836   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
837   set_nonblocking($reader{$id});
838
839   my $childslot = $freeslot[0];
840   my $childnode = $slot[$childslot]->{node};
841   my $childslotname = join (".",
842                             $slot[$childslot]->{node}->{name},
843                             $slot[$childslot]->{cpu});
844
845   my $childpid = fork();
846   if ($childpid == 0)
847   {
848     $SIG{'INT'} = 'DEFAULT';
849     $SIG{'QUIT'} = 'DEFAULT';
850     $SIG{'TERM'} = 'DEFAULT';
851
852     foreach (values (%reader))
853     {
854       close($_);
855     }
856     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
857     open(STDOUT,">&writer");
858     open(STDERR,">&writer");
859
860     undef $dbh;
861     undef $sth;
862
863     delete $ENV{"GNUPGHOME"};
864     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
865     $ENV{"TASK_QSEQUENCE"} = $id;
866     $ENV{"TASK_SEQUENCE"} = $level;
867     $ENV{"JOB_SCRIPT"} = $Job->{script};
868     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
869       $param =~ tr/a-z/A-Z/;
870       $ENV{"JOB_PARAMETER_$param"} = $value;
871     }
872     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
873     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
874     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
875     $ENV{"HOME"} = $ENV{"TASK_WORK"};
876     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
877     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
878     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
879
880     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
881
882     $ENV{"GZIP"} = "-n";
883
884     my @srunargs = (
885       "srun",
886       "--nodelist=".$childnode->{name},
887       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
888       "--job-name=$job_id.$id.$$",
889         );
890
891     my $stdbuf = " stdbuf --output=0 --error=0 ";
892
893     my $arv_file_cache = "";
894     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
895       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
896     }
897
898     my $command =
899         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
900         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
901         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
902         # These environment variables get used explicitly later in
903         # $command.  No tool is expected to read these values directly.
904         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
905         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
906         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
907         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
908
909     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
910     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
911     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
912
913     if ($docker_hash)
914     {
915       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
916       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
917       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
918       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
919       # We only set memory limits if Docker lets us limit both memory and swap.
920       # Memory limits alone have been supported longer, but subprocesses tend
921       # to get SIGKILL if they exceed that without any swap limit set.
922       # See #5642 for additional background.
923       if ($docker_limitmem) {
924         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
925       }
926
927       # The source tree and $destdir directory (which we have
928       # installed on the worker host) are available in the container,
929       # under the same path.
930       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
931       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
932
933       # Currently, we make the "by_pdh" directory in arv-mount's mount
934       # point appear at /keep inside the container (instead of using
935       # the same path as the host like we do with CRUNCH_SRC and
936       # CRUNCH_INSTALL). However, crunch scripts and utilities must
937       # not rely on this. They must use $TASK_KEEPMOUNT.
938       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
939       $ENV{TASK_KEEPMOUNT} = "/keep";
940
941       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
942       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
943       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
944
945       # TASK_WORK is almost exactly like a docker data volume: it
946       # starts out empty, is writable, and persists until no
947       # containers use it any more. We don't use --volumes-from to
948       # share it with other containers: it is only accessible to this
949       # task, and it goes away when this task stops.
950       #
951       # However, a docker data volume is writable only by root unless
952       # the mount point already happens to exist in the container with
953       # different permissions. Therefore, we [1] assume /tmp already
954       # exists in the image and is writable by the crunch user; [2]
955       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
956       # writable if they are created by docker while setting up the
957       # other --volumes); and [3] create $TASK_WORK inside the
958       # container using $build_script.
959       $command .= "--volume=/tmp ";
960       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
961       $ENV{"HOME"} = $ENV{"TASK_WORK"};
962       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
963
964       # TODO: Share a single JOB_WORK volume across all task
965       # containers on a given worker node, and delete it when the job
966       # ends (and, in case that doesn't work, when the next job
967       # starts).
968       #
969       # For now, use the same approach as TASK_WORK above.
970       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
971
972       while (my ($env_key, $env_val) = each %ENV)
973       {
974         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
975           $command .= "--env=\Q$env_key=$env_val\E ";
976         }
977       }
978       $command .= "--env=\QHOME=$ENV{HOME}\E ";
979       $command .= "\Q$docker_hash\E ";
980
981       if ($Job->{arvados_sdk_version}) {
982         $command .= $stdbuf;
983         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
984       } else {
985         $command .= "/bin/sh -c \'python -c " .
986             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
987             ">&2 2>/dev/null; " .
988             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
989             "if which stdbuf >/dev/null ; then " .
990             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
991             " else " .
992             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
993             " fi\'";
994       }
995     } else {
996       # Non-docker run
997       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
998       $command .= $stdbuf;
999       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
1000     }
1001
1002     my @execargs = ('bash', '-c', $command);
1003     srun (\@srunargs, \@execargs, undef, $build_script);
1004     # exec() failed, we assume nothing happened.
1005     die "srun() failed on build script\n";
1006   }
1007   close("writer");
1008   if (!defined $childpid)
1009   {
1010     close $reader{$id};
1011     delete $reader{$id};
1012     next;
1013   }
1014   shift @freeslot;
1015   $proc{$childpid} = { jobstep => $id,
1016                        time => time,
1017                        slot => $childslot,
1018                        jobstepname => "$job_id.$id.$childpid",
1019                      };
1020   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1021   $slot[$childslot]->{pid} = $childpid;
1022
1023   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1024   Log ($id, "child $childpid started on $childslotname");
1025   $Jobstep->{starttime} = time;
1026   $Jobstep->{node} = $childnode->{name};
1027   $Jobstep->{slotindex} = $childslot;
1028   delete $Jobstep->{stderr};
1029   delete $Jobstep->{finishtime};
1030   delete $Jobstep->{tempfail};
1031
1032   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1033   $Jobstep->{'arvados_task'}->save;
1034
1035   splice @jobstep_todo, $todo_ptr, 1;
1036   --$todo_ptr;
1037
1038   $progress_is_dirty = 1;
1039
1040   while (!@freeslot
1041          ||
1042          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1043   {
1044     last THISROUND if $main::please_freeze;
1045     if ($main::please_info)
1046     {
1047       $main::please_info = 0;
1048       freeze();
1049       create_output_collection();
1050       save_meta(1);
1051       update_progress_stats();
1052     }
1053     my $gotsome
1054         = readfrompipes ()
1055         + reapchildren ();
1056     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1057     {
1058       check_refresh_wanted();
1059       check_squeue();
1060       update_progress_stats();
1061     }
1062     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1063     {
1064       update_progress_stats();
1065     }
1066     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1067                                         $_->{node}->{hold_count} < 4 } @slot);
1068     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1069         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1070     {
1071       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1072           .($thisround_failed+$thisround_succeeded)
1073           .") -- giving up on this round";
1074       Log (undef, $message);
1075       last THISROUND;
1076     }
1077
1078     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1079     for (my $i=$#freeslot; $i>=0; $i--) {
1080       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1081         push @holdslot, (splice @freeslot, $i, 1);
1082       }
1083     }
1084     for (my $i=$#holdslot; $i>=0; $i--) {
1085       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1086         push @freeslot, (splice @holdslot, $i, 1);
1087       }
1088     }
1089
1090     # give up if no nodes are succeeding
1091     if ($working_slot_count < 1) {
1092       Log(undef, "Every node has failed -- giving up");
1093       last THISROUND;
1094     }
1095   }
1096 }
1097
1098
1099 push @freeslot, splice @holdslot;
1100 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1101
1102
1103 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1104 while (%proc)
1105 {
1106   if ($main::please_continue) {
1107     $main::please_continue = 0;
1108     goto THISROUND;
1109   }
1110   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1111   readfrompipes ();
1112   if (!reapchildren())
1113   {
1114     check_refresh_wanted();
1115     check_squeue();
1116     update_progress_stats();
1117     select (undef, undef, undef, 0.1);
1118     killem (keys %proc) if $main::please_freeze;
1119   }
1120 }
1121
1122 update_progress_stats();
1123 freeze_if_want_freeze();
1124
1125
1126 if (!defined $main::success)
1127 {
1128   if (!@jobstep_todo) {
1129     $main::success = 1;
1130   } elsif ($working_slot_count < 1) {
1131     save_output_collection();
1132     save_meta();
1133     exit(EX_RETRY_UNLOCKED);
1134   } elsif ($thisround_succeeded == 0 &&
1135            ($thisround_failed == 0 || $thisround_failed > 4)) {
1136     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1137     Log (undef, $message);
1138     $main::success = 0;
1139   }
1140 }
1141
1142 goto ONELEVEL if !defined $main::success;
1143
1144
1145 release_allocation();
1146 freeze();
1147 my $collated_output = save_output_collection();
1148 Log (undef, "finish");
1149
1150 save_meta();
1151
1152 my $final_state;
1153 if ($collated_output && $main::success) {
1154   $final_state = 'Complete';
1155 } else {
1156   $final_state = 'Failed';
1157 }
1158 $Job->update_attributes('state' => $final_state);
1159
1160 exit (($final_state eq 'Complete') ? 0 : 1);
1161
1162
1163
1164 sub update_progress_stats
1165 {
1166   $progress_stats_updated = time;
1167   return if !$progress_is_dirty;
1168   my ($todo, $done, $running) = (scalar @jobstep_todo,
1169                                  scalar @jobstep_done,
1170                                  scalar keys(%proc));
1171   $Job->{'tasks_summary'} ||= {};
1172   $Job->{'tasks_summary'}->{'todo'} = $todo;
1173   $Job->{'tasks_summary'}->{'done'} = $done;
1174   $Job->{'tasks_summary'}->{'running'} = $running;
1175   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1176   Log (undef, "status: $done done, $running running, $todo todo");
1177   $progress_is_dirty = 0;
1178 }
1179
1180
1181
1182 sub reapchildren
1183 {
1184   my $children_reaped = 0;
1185
1186   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1187   {
1188     my $childstatus = $?;
1189     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1190                     . "."
1191                     . $slot[$proc{$pid}->{slot}]->{cpu});
1192     my $jobstepid = $proc{$pid}->{jobstep};
1193
1194     if (!WIFEXITED($childstatus))
1195     {
1196       # child did not exit (may be temporarily stopped)
1197       Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now.");
1198       next;
1199     }
1200
1201     $children_reaped++;
1202     my $elapsed = time - $proc{$pid}->{time};
1203     my $Jobstep = $jobstep[$jobstepid];
1204
1205     my $exitvalue = $childstatus >> 8;
1206     my $exitinfo = "exit ".exit_status_s($childstatus);
1207     $Jobstep->{'arvados_task'}->reload;
1208     my $task_success = $Jobstep->{'arvados_task'}->{success};
1209
1210     Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1211
1212     if (!defined $task_success) {
1213       # task did not indicate one way or the other --> fail
1214       Log($jobstepid, sprintf(
1215             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1216             exit_status_s($childstatus)));
1217       $Jobstep->{'arvados_task'}->{success} = 0;
1218       $Jobstep->{'arvados_task'}->save;
1219       $task_success = 0;
1220     }
1221
1222     if (!$task_success)
1223     {
1224       my $temporary_fail;
1225       $temporary_fail ||= $Jobstep->{tempfail};
1226       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1227
1228       ++$thisround_failed;
1229       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1230
1231       # Check for signs of a failed or misconfigured node
1232       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1233           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1234         # Don't count this against jobstep failure thresholds if this
1235         # node is already suspected faulty and srun exited quickly
1236         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1237             $elapsed < 5) {
1238           Log ($jobstepid, "blaming failure on suspect node " .
1239                $slot[$proc{$pid}->{slot}]->{node}->{name});
1240           $temporary_fail ||= 1;
1241         }
1242         ban_node_by_slot($proc{$pid}->{slot});
1243       }
1244
1245       Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1246                                ++$Jobstep->{'failures'},
1247                                $temporary_fail ? 'temporary' : 'permanent',
1248                                $elapsed));
1249
1250       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1251         # Give up on this task, and the whole job
1252         $main::success = 0;
1253       }
1254       # Put this task back on the todo queue
1255       push @jobstep_todo, $jobstepid;
1256       $Job->{'tasks_summary'}->{'failed'}++;
1257     }
1258     else
1259     {
1260       ++$thisround_succeeded;
1261       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1262       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1263       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1264       push @jobstep_done, $jobstepid;
1265       Log ($jobstepid, "success in $elapsed seconds");
1266     }
1267     $Jobstep->{exitcode} = $childstatus;
1268     $Jobstep->{finishtime} = time;
1269     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1270     $Jobstep->{'arvados_task'}->save;
1271     process_stderr ($jobstepid, $task_success);
1272     Log ($jobstepid, sprintf("task output (%d bytes): %s",
1273                              length($Jobstep->{'arvados_task'}->{output}),
1274                              $Jobstep->{'arvados_task'}->{output}));
1275
1276     close $reader{$jobstepid};
1277     delete $reader{$jobstepid};
1278     delete $slot[$proc{$pid}->{slot}]->{pid};
1279     push @freeslot, $proc{$pid}->{slot};
1280     delete $proc{$pid};
1281
1282     if ($task_success) {
1283       # Load new tasks
1284       my $newtask_list = [];
1285       my $newtask_results;
1286       do {
1287         $newtask_results = api_call(
1288           "job_tasks/list",
1289           'where' => {
1290             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1291           },
1292           'order' => 'qsequence',
1293           'offset' => scalar(@$newtask_list),
1294             );
1295         push(@$newtask_list, @{$newtask_results->{items}});
1296       } while (@{$newtask_results->{items}});
1297       foreach my $arvados_task (@$newtask_list) {
1298         my $jobstep = {
1299           'level' => $arvados_task->{'sequence'},
1300           'failures' => 0,
1301           'arvados_task' => $arvados_task
1302         };
1303         push @jobstep, $jobstep;
1304         push @jobstep_todo, $#jobstep;
1305       }
1306     }
1307     $progress_is_dirty = 1;
1308   }
1309
1310   return $children_reaped;
1311 }
1312
1313 sub check_refresh_wanted
1314 {
1315   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1316   if (@stat && $stat[9] > $latest_refresh) {
1317     $latest_refresh = scalar time;
1318     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1319     for my $attr ('cancelled_at',
1320                   'cancelled_by_user_uuid',
1321                   'cancelled_by_client_uuid',
1322                   'state') {
1323       $Job->{$attr} = $Job2->{$attr};
1324     }
1325     if ($Job->{'state'} ne "Running") {
1326       if ($Job->{'state'} eq "Cancelled") {
1327         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1328       } else {
1329         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1330       }
1331       $main::success = 0;
1332       $main::please_freeze = 1;
1333     }
1334   }
1335 }
1336
1337 sub check_squeue
1338 {
1339   my $last_squeue_check = $squeue_checked;
1340
1341   # Do not call `squeue` or check the kill list more than once every
1342   # 15 seconds.
1343   return if $last_squeue_check > time - 15;
1344   $squeue_checked = time;
1345
1346   # Look for children from which we haven't received stderr data since
1347   # the last squeue check. If no such children exist, all procs are
1348   # alive and there's no need to even look at squeue.
1349   #
1350   # As long as the crunchstat poll interval (10s) is shorter than the
1351   # squeue check interval (15s) this should make the squeue check an
1352   # infrequent event.
1353   my $silent_procs = 0;
1354   for my $js (map {$jobstep[$_->{jobstep}]} values %proc)
1355   {
1356     if (!exists($js->{stderr_at}))
1357     {
1358       $js->{stderr_at} = 0;
1359     }
1360     if ($js->{stderr_at} < $last_squeue_check)
1361     {
1362       $silent_procs++;
1363     }
1364   }
1365   return if $silent_procs == 0;
1366
1367   # use killem() on procs whose killtime is reached
1368   while (my ($pid, $procinfo) = each %proc)
1369   {
1370     my $js = $jobstep[$procinfo->{jobstep}];
1371     if (exists $procinfo->{killtime}
1372         && $procinfo->{killtime} <= time
1373         && $js->{stderr_at} < $last_squeue_check)
1374     {
1375       my $sincewhen = "";
1376       if ($js->{stderr_at}) {
1377         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1378       }
1379       Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1380       killem ($pid);
1381     }
1382   }
1383
1384   if (!$have_slurm)
1385   {
1386     # here is an opportunity to check for mysterious problems with local procs
1387     return;
1388   }
1389
1390   # Get a list of steps still running.  Note: squeue(1) says --steps
1391   # selects a format (which we override anyway) and allows us to
1392   # specify which steps we're interested in (which we don't).
1393   # Importantly, it also changes the meaning of %j from "job name" to
1394   # "step name" and (although this isn't mentioned explicitly in the
1395   # docs) switches from "one line per job" mode to "one line per step"
1396   # mode. Without it, we'd just get a list of one job, instead of a
1397   # list of N steps.
1398   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1399   if ($? != 0)
1400   {
1401     Log(undef, "warning: squeue exit status $? ($!)");
1402     return;
1403   }
1404   chop @squeue;
1405
1406   # which of my jobsteps are running, according to squeue?
1407   my %ok;
1408   for my $jobstepname (@squeue)
1409   {
1410     $ok{$jobstepname} = 1;
1411   }
1412
1413   # Check for child procs >60s old and not mentioned by squeue.
1414   while (my ($pid, $procinfo) = each %proc)
1415   {
1416     my $js = $jobstep[$procinfo->{jobstep}];
1417     if ($procinfo->{time} < time - 60
1418         && $procinfo->{jobstepname}
1419         && !exists $ok{$procinfo->{jobstepname}}
1420         && !exists $procinfo->{killtime})
1421     {
1422       # According to slurm, this task has ended (successfully or not)
1423       # -- but our srun child hasn't exited. First we must wait (30
1424       # seconds) in case this is just a race between communication
1425       # channels. Then, if our srun child process still hasn't
1426       # terminated, we'll conclude some slurm communication
1427       # error/delay has caused the task to die without notifying srun,
1428       # and we'll kill srun ourselves.
1429       $procinfo->{killtime} = time + 30;
1430       Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1431     }
1432   }
1433 }
1434
1435
1436 sub release_allocation
1437 {
1438   if ($have_slurm)
1439   {
1440     Log (undef, "release job allocation");
1441     system "scancel $ENV{SLURM_JOB_ID}";
1442   }
1443 }
1444
1445
1446 sub readfrompipes
1447 {
1448   my $gotsome = 0;
1449   my %fd_job;
1450   my $sel = IO::Select->new();
1451   foreach my $job (keys %reader)
1452   {
1453     my $fd = $reader{$job};
1454     $sel->add($fd);
1455     $fd_job{$fd} = $job;
1456   }
1457   # select on all reader fds with 0.1s timeout
1458   my @ready_fds = $sel->can_read(0.1);
1459   foreach my $fd (@ready_fds)
1460   {
1461     my $buf;
1462     if (0 < sysread ($fd, $buf, 65536))
1463     {
1464       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1465       my $job = $fd_job{$fd};
1466       $jobstep[$job]->{stderr_at} = time;
1467       $jobstep[$job]->{stderr} .= $buf;
1468
1469       # Consume everything up to the last \n
1470       preprocess_stderr ($job);
1471
1472       if (length ($jobstep[$job]->{stderr}) > 16384)
1473       {
1474         # If we get a lot of stderr without a newline, chop off the
1475         # front to avoid letting our buffer grow indefinitely.
1476         substr ($jobstep[$job]->{stderr},
1477                 0, length($jobstep[$job]->{stderr}) - 8192) = "";
1478       }
1479       $gotsome = 1;
1480     }
1481   }
1482   return $gotsome;
1483 }
1484
1485
1486 sub preprocess_stderr
1487 {
1488   my $job = shift;
1489
1490   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1491     my $line = $1;
1492     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1493     Log ($job, "stderr $line");
1494     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1495       # whoa.
1496       $main::please_freeze = 1;
1497     }
1498     elsif ($line =~ /srun: error: Node failure on/) {
1499       my $job_slot_index = $jobstep[$job]->{slotindex};
1500       $slot[$job_slot_index]->{node}->{fail_count}++;
1501       $jobstep[$job]->{tempfail} = 1;
1502       ban_node_by_slot($job_slot_index);
1503     }
1504     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1505       $jobstep[$job]->{tempfail} = 1;
1506       ban_node_by_slot($jobstep[$job]->{slotindex});
1507     }
1508     elsif ($line =~ /arvados\.errors\.Keep/) {
1509       $jobstep[$job]->{tempfail} = 1;
1510     }
1511   }
1512 }
1513
1514
1515 sub process_stderr
1516 {
1517   my $job = shift;
1518   my $task_success = shift;
1519   preprocess_stderr ($job);
1520
1521   map {
1522     Log ($job, "stderr $_");
1523   } split ("\n", $jobstep[$job]->{stderr});
1524 }
1525
1526 sub fetch_block
1527 {
1528   my $hash = shift;
1529   my $keep;
1530   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1531     Log(undef, "fetch_block run error from arv-get $hash: $!");
1532     return undef;
1533   }
1534   my $output_block = "";
1535   while (1) {
1536     my $buf;
1537     my $bytes = sysread($keep, $buf, 1024 * 1024);
1538     if (!defined $bytes) {
1539       Log(undef, "fetch_block read error from arv-get: $!");
1540       $output_block = undef;
1541       last;
1542     } elsif ($bytes == 0) {
1543       # sysread returns 0 at the end of the pipe.
1544       last;
1545     } else {
1546       # some bytes were read into buf.
1547       $output_block .= $buf;
1548     }
1549   }
1550   close $keep;
1551   if ($?) {
1552     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1553     $output_block = undef;
1554   }
1555   return $output_block;
1556 }
1557
1558 # Create a collection by concatenating the output of all tasks (each
1559 # task's output is either a manifest fragment, a locator for a
1560 # manifest fragment stored in Keep, or nothing at all). Return the
1561 # portable_data_hash of the new collection.
1562 sub create_output_collection
1563 {
1564   Log (undef, "collate");
1565
1566   my ($child_out, $child_in);
1567   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1568 import arvados
1569 import sys
1570 print (arvados.api("v1").collections().
1571        create(body={"manifest_text": sys.stdin.read()}).
1572        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1573 }, retry_count());
1574
1575   my $task_idx = -1;
1576   my $manifest_size = 0;
1577   for (@jobstep)
1578   {
1579     ++$task_idx;
1580     my $output = $_->{'arvados_task'}->{output};
1581     next if (!defined($output));
1582     my $next_write;
1583     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1584       $next_write = fetch_block($output);
1585     } else {
1586       $next_write = $output;
1587     }
1588     if (defined($next_write)) {
1589       if (!defined(syswrite($child_in, $next_write))) {
1590         # There's been an error writing.  Stop the loop.
1591         # We'll log details about the exit code later.
1592         last;
1593       } else {
1594         $manifest_size += length($next_write);
1595       }
1596     } else {
1597       my $uuid = $_->{'arvados_task'}->{'uuid'};
1598       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1599       $main::success = 0;
1600     }
1601   }
1602   close($child_in);
1603   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1604
1605   my $joboutput;
1606   my $s = IO::Select->new($child_out);
1607   if ($s->can_read(120)) {
1608     sysread($child_out, $joboutput, 1024 * 1024);
1609     waitpid($pid, 0);
1610     if ($?) {
1611       Log(undef, "output collection creation exited " . exit_status_s($?));
1612       $joboutput = undef;
1613     } else {
1614       chomp($joboutput);
1615     }
1616   } else {
1617     Log (undef, "timed out while creating output collection");
1618     foreach my $signal (2, 2, 2, 15, 15, 9) {
1619       kill($signal, $pid);
1620       last if waitpid($pid, WNOHANG) == -1;
1621       sleep(1);
1622     }
1623   }
1624   close($child_out);
1625
1626   return $joboutput;
1627 }
1628
1629 # Calls create_output_collection, logs the result, and returns it.
1630 # If that was successful, save that as the output in the job record.
1631 sub save_output_collection {
1632   my $collated_output = create_output_collection();
1633
1634   if (!$collated_output) {
1635     Log(undef, "Failed to write output collection");
1636   }
1637   else {
1638     Log(undef, "job output $collated_output");
1639     $Job->update_attributes('output' => $collated_output);
1640   }
1641   return $collated_output;
1642 }
1643
1644 sub killem
1645 {
1646   foreach (@_)
1647   {
1648     my $sig = 2;                # SIGINT first
1649     if (exists $proc{$_}->{"sent_$sig"} &&
1650         time - $proc{$_}->{"sent_$sig"} > 4)
1651     {
1652       $sig = 15;                # SIGTERM if SIGINT doesn't work
1653     }
1654     if (exists $proc{$_}->{"sent_$sig"} &&
1655         time - $proc{$_}->{"sent_$sig"} > 4)
1656     {
1657       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1658     }
1659     if (!exists $proc{$_}->{"sent_$sig"})
1660     {
1661       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1662       kill $sig, $_;
1663       select (undef, undef, undef, 0.1);
1664       if ($sig == 2)
1665       {
1666         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1667       }
1668       $proc{$_}->{"sent_$sig"} = time;
1669       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1670     }
1671   }
1672 }
1673
1674
1675 sub fhbits
1676 {
1677   my($bits);
1678   for (@_) {
1679     vec($bits,fileno($_),1) = 1;
1680   }
1681   $bits;
1682 }
1683
1684
1685 # Send log output to Keep via arv-put.
1686 #
1687 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1688 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1689 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1690 # $log_pipe_pid is the pid of the arv-put subprocess.
1691 #
1692 # The only functions that should access these variables directly are:
1693 #
1694 # log_writer_start($logfilename)
1695 #     Starts an arv-put pipe, reading data on stdin and writing it to
1696 #     a $logfilename file in an output collection.
1697 #
1698 # log_writer_read_output([$timeout])
1699 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1700 #     Passes $timeout to the select() call, with a default of 0.01.
1701 #     Returns the result of the last read() call on $log_pipe_out, or
1702 #     -1 if read() wasn't called because select() timed out.
1703 #     Only other log_writer_* functions should need to call this.
1704 #
1705 # log_writer_send($txt)
1706 #     Writes $txt to the output log collection.
1707 #
1708 # log_writer_finish()
1709 #     Closes the arv-put pipe and returns the output that it produces.
1710 #
1711 # log_writer_is_active()
1712 #     Returns a true value if there is currently a live arv-put
1713 #     process, false otherwise.
1714 #
1715 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1716     $log_pipe_pid);
1717
1718 sub log_writer_start($)
1719 {
1720   my $logfilename = shift;
1721   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1722                         'arv-put',
1723                         '--stream',
1724                         '--retries', '3',
1725                         '--filename', $logfilename,
1726                         '-');
1727   $log_pipe_out_buf = "";
1728   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1729 }
1730
1731 sub log_writer_read_output {
1732   my $timeout = shift || 0.01;
1733   my $read = -1;
1734   while ($read && $log_pipe_out_select->can_read($timeout)) {
1735     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1736                  length($log_pipe_out_buf));
1737   }
1738   if (!defined($read)) {
1739     Log(undef, "error reading log manifest from arv-put: $!");
1740   }
1741   return $read;
1742 }
1743
1744 sub log_writer_send($)
1745 {
1746   my $txt = shift;
1747   print $log_pipe_in $txt;
1748   log_writer_read_output();
1749 }
1750
1751 sub log_writer_finish()
1752 {
1753   return unless $log_pipe_pid;
1754
1755   close($log_pipe_in);
1756
1757   my $logger_failed = 0;
1758   my $read_result = log_writer_read_output(120);
1759   if ($read_result == -1) {
1760     $logger_failed = -1;
1761     Log (undef, "timed out reading from 'arv-put'");
1762   } elsif ($read_result != 0) {
1763     $logger_failed = -2;
1764     Log(undef, "failed to read arv-put log manifest to EOF");
1765   }
1766
1767   waitpid($log_pipe_pid, 0);
1768   if ($?) {
1769     $logger_failed ||= $?;
1770     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1771   }
1772
1773   close($log_pipe_out);
1774   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1775   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1776       $log_pipe_out_select = undef;
1777
1778   return $arv_put_output;
1779 }
1780
1781 sub log_writer_is_active() {
1782   return $log_pipe_pid;
1783 }
1784
1785 sub Log                         # ($jobstep_id, $logmessage)
1786 {
1787   if ($_[1] =~ /\n/) {
1788     for my $line (split (/\n/, $_[1])) {
1789       Log ($_[0], $line);
1790     }
1791     return;
1792   }
1793   my $fh = select STDERR; $|=1; select $fh;
1794   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1795   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1796   $message .= "\n";
1797   my $datetime;
1798   if (log_writer_is_active() || -t STDERR) {
1799     my @gmtime = gmtime;
1800     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1801                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1802   }
1803   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1804
1805   if (log_writer_is_active()) {
1806     log_writer_send($datetime . " " . $message);
1807   }
1808 }
1809
1810
1811 sub croak
1812 {
1813   my ($package, $file, $line) = caller;
1814   my $message = "@_ at $file line $line\n";
1815   Log (undef, $message);
1816   freeze() if @jobstep_todo;
1817   create_output_collection() if @jobstep_todo;
1818   cleanup();
1819   save_meta();
1820   die;
1821 }
1822
1823
1824 sub cleanup
1825 {
1826   return unless $Job;
1827   if ($Job->{'state'} eq 'Cancelled') {
1828     $Job->update_attributes('finished_at' => scalar gmtime);
1829   } else {
1830     $Job->update_attributes('state' => 'Failed');
1831   }
1832 }
1833
1834
1835 sub save_meta
1836 {
1837   my $justcheckpoint = shift; # false if this will be the last meta saved
1838   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1839   return unless log_writer_is_active();
1840   my $log_manifest = log_writer_finish();
1841   return unless defined($log_manifest);
1842
1843   if ($Job->{log}) {
1844     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1845     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1846   }
1847
1848   my $log_coll = api_call(
1849     "collections/create", ensure_unique_name => 1, collection => {
1850       manifest_text => $log_manifest,
1851       owner_uuid => $Job->{owner_uuid},
1852       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1853     });
1854   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1855   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1856 }
1857
1858
1859 sub freeze_if_want_freeze
1860 {
1861   if ($main::please_freeze)
1862   {
1863     release_allocation();
1864     if (@_)
1865     {
1866       # kill some srun procs before freeze+stop
1867       map { $proc{$_} = {} } @_;
1868       while (%proc)
1869       {
1870         killem (keys %proc);
1871         select (undef, undef, undef, 0.1);
1872         my $died;
1873         while (($died = waitpid (-1, WNOHANG)) > 0)
1874         {
1875           delete $proc{$died};
1876         }
1877       }
1878     }
1879     freeze();
1880     create_output_collection();
1881     cleanup();
1882     save_meta();
1883     exit 1;
1884   }
1885 }
1886
1887
1888 sub freeze
1889 {
1890   Log (undef, "Freeze not implemented");
1891   return;
1892 }
1893
1894
1895 sub thaw
1896 {
1897   croak ("Thaw not implemented");
1898 }
1899
1900
1901 sub freezequote
1902 {
1903   my $s = shift;
1904   $s =~ s/\\/\\\\/g;
1905   $s =~ s/\n/\\n/g;
1906   return $s;
1907 }
1908
1909
1910 sub freezeunquote
1911 {
1912   my $s = shift;
1913   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1914   return $s;
1915 }
1916
1917
1918 sub srun
1919 {
1920   my $srunargs = shift;
1921   my $execargs = shift;
1922   my $opts = shift || {};
1923   my $stdin = shift;
1924   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1925
1926   $Data::Dumper::Terse = 1;
1927   $Data::Dumper::Indent = 0;
1928   my $show_cmd = Dumper($args);
1929   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1930   $show_cmd =~ s/\n/ /g;
1931   if ($opts->{fork}) {
1932     Log(undef, "starting: $show_cmd");
1933   } else {
1934     # This is a child process: parent is in charge of reading our
1935     # stderr and copying it to Log() if needed.
1936     warn "starting: $show_cmd\n";
1937   }
1938
1939   if (defined $stdin) {
1940     my $child = open STDIN, "-|";
1941     defined $child or die "no fork: $!";
1942     if ($child == 0) {
1943       print $stdin or die $!;
1944       close STDOUT or die $!;
1945       exit 0;
1946     }
1947   }
1948
1949   return system (@$args) if $opts->{fork};
1950
1951   exec @$args;
1952   warn "ENV size is ".length(join(" ",%ENV));
1953   die "exec failed: $!: @$args";
1954 }
1955
1956
1957 sub ban_node_by_slot {
1958   # Don't start any new jobsteps on this node for 60 seconds
1959   my $slotid = shift;
1960   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1961   $slot[$slotid]->{node}->{hold_count}++;
1962   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1963 }
1964
1965 sub must_lock_now
1966 {
1967   my ($lockfile, $error_message) = @_;
1968   open L, ">", $lockfile or croak("$lockfile: $!");
1969   if (!flock L, LOCK_EX|LOCK_NB) {
1970     croak("Can't lock $lockfile: $error_message\n");
1971   }
1972 }
1973
1974 sub find_docker_image {
1975   # Given a Keep locator, check to see if it contains a Docker image.
1976   # If so, return its stream name and Docker hash.
1977   # If not, return undef for both values.
1978   my $locator = shift;
1979   my ($streamname, $filename);
1980   my $image = api_call("collections/get", uuid => $locator);
1981   if ($image) {
1982     foreach my $line (split(/\n/, $image->{manifest_text})) {
1983       my @tokens = split(/\s+/, $line);
1984       next if (!@tokens);
1985       $streamname = shift(@tokens);
1986       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1987         if (defined($filename)) {
1988           return (undef, undef);  # More than one file in the Collection.
1989         } else {
1990           $filename = (split(/:/, $filedata, 3))[2];
1991         }
1992       }
1993     }
1994   }
1995   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1996     return ($streamname, $1);
1997   } else {
1998     return (undef, undef);
1999   }
2000 }
2001
2002 sub retry_count {
2003   # Calculate the number of times an operation should be retried,
2004   # assuming exponential backoff, and that we're willing to retry as
2005   # long as tasks have been running.  Enforce a minimum of 3 retries.
2006   my ($starttime, $endtime, $timediff, $retries);
2007   if (@jobstep) {
2008     $starttime = $jobstep[0]->{starttime};
2009     $endtime = $jobstep[-1]->{finishtime};
2010   }
2011   if (!defined($starttime)) {
2012     $timediff = 0;
2013   } elsif (!defined($endtime)) {
2014     $timediff = time - $starttime;
2015   } else {
2016     $timediff = ($endtime - $starttime) - (time - $endtime);
2017   }
2018   if ($timediff > 0) {
2019     $retries = int(log($timediff) / log(2));
2020   } else {
2021     $retries = 1;  # Use the minimum.
2022   }
2023   return ($retries > 3) ? $retries : 3;
2024 }
2025
2026 sub retry_op {
2027   # Pass in two function references.
2028   # This method will be called with the remaining arguments.
2029   # If it dies, retry it with exponential backoff until it succeeds,
2030   # or until the current retry_count is exhausted.  After each failure
2031   # that can be retried, the second function will be called with
2032   # the current try count (0-based), next try time, and error message.
2033   my $operation = shift;
2034   my $retry_callback = shift;
2035   my $retries = retry_count();
2036   foreach my $try_count (0..$retries) {
2037     my $next_try = time + (2 ** $try_count);
2038     my $result = eval { $operation->(@_); };
2039     if (!$@) {
2040       return $result;
2041     } elsif ($try_count < $retries) {
2042       $retry_callback->($try_count, $next_try, $@);
2043       my $sleep_time = $next_try - time;
2044       sleep($sleep_time) if ($sleep_time > 0);
2045     }
2046   }
2047   # Ensure the error message ends in a newline, so Perl doesn't add
2048   # retry_op's line number to it.
2049   chomp($@);
2050   die($@ . "\n");
2051 }
2052
2053 sub api_call {
2054   # Pass in a /-separated API method name, and arguments for it.
2055   # This function will call that method, retrying as needed until
2056   # the current retry_count is exhausted, with a log on the first failure.
2057   my $method_name = shift;
2058   my $log_api_retry = sub {
2059     my ($try_count, $next_try_at, $errmsg) = @_;
2060     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2061     $errmsg =~ s/\s/ /g;
2062     $errmsg =~ s/\s+$//;
2063     my $retry_msg;
2064     if ($next_try_at < time) {
2065       $retry_msg = "Retrying.";
2066     } else {
2067       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2068       $retry_msg = "Retrying at $next_try_fmt.";
2069     }
2070     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2071   };
2072   my $method = $arv;
2073   foreach my $key (split(/\//, $method_name)) {
2074     $method = $method->{$key};
2075   }
2076   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2077 }
2078
2079 sub exit_status_s {
2080   # Given a $?, return a human-readable exit code string like "0" or
2081   # "1" or "0 with signal 1" or "1 with signal 11".
2082   my $exitcode = shift;
2083   my $s = $exitcode >> 8;
2084   if ($exitcode & 0x7f) {
2085     $s .= " with signal " . ($exitcode & 0x7f);
2086   }
2087   if ($exitcode & 0x80) {
2088     $s .= " with core dump";
2089   }
2090   return $s;
2091 }
2092
2093 sub handle_readall {
2094   # Pass in a glob reference to a file handle.
2095   # Read all its contents and return them as a string.
2096   my $fh_glob_ref = shift;
2097   local $/ = undef;
2098   return <$fh_glob_ref>;
2099 }
2100
2101 sub tar_filename_n {
2102   my $n = shift;
2103   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2104 }
2105
2106 sub add_git_archive {
2107   # Pass in a git archive command as a string or list, a la system().
2108   # This method will save its output to be included in the archive sent to the
2109   # build script.
2110   my $git_input;
2111   $git_tar_count++;
2112   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2113     croak("Failed to save git archive: $!");
2114   }
2115   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2116   close($git_input);
2117   waitpid($git_pid, 0);
2118   close(GIT_ARCHIVE);
2119   if ($?) {
2120     croak("Failed to save git archive: git exited " . exit_status_s($?));
2121   }
2122 }
2123
2124 sub combined_git_archive {
2125   # Combine all saved tar archives into a single archive, then return its
2126   # contents in a string.  Return undef if no archives have been saved.
2127   if ($git_tar_count < 1) {
2128     return undef;
2129   }
2130   my $base_tar_name = tar_filename_n(1);
2131   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2132     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2133     if ($tar_exit != 0) {
2134       croak("Error preparing build archive: tar -A exited " .
2135             exit_status_s($tar_exit));
2136     }
2137   }
2138   if (!open(GIT_TAR, "<", $base_tar_name)) {
2139     croak("Could not open build archive: $!");
2140   }
2141   my $tar_contents = handle_readall(\*GIT_TAR);
2142   close(GIT_TAR);
2143   return $tar_contents;
2144 }
2145
2146 sub set_nonblocking {
2147   my $fh = shift;
2148   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2149   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2150 }
2151
2152 __DATA__
2153 #!/usr/bin/env perl
2154 #
2155 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2156 # server invokes this script on individual compute nodes, or localhost if we're
2157 # running a job locally.  It gets called in two modes:
2158 #
2159 # * No arguments: Installation mode.  Read a tar archive from the DATA
2160 #   file handle; it includes the Crunch script's source code, and
2161 #   maybe SDKs as well.  Those should be installed in the proper
2162 #   locations.  This runs outside of any Docker container, so don't try to
2163 #   introspect Crunch's runtime environment.
2164 #
2165 # * With arguments: Crunch script run mode.  This script should set up the
2166 #   environment, then run the command specified in the arguments.  This runs
2167 #   inside any Docker container.
2168
2169 use Fcntl ':flock';
2170 use File::Path qw( make_path remove_tree );
2171 use POSIX qw(getcwd);
2172
2173 use constant TASK_TEMPFAIL => 111;
2174
2175 # Map SDK subdirectories to the path environments they belong to.
2176 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2177
2178 my $destdir = $ENV{"CRUNCH_SRC"};
2179 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2180 my $repo = $ENV{"CRUNCH_SRC_URL"};
2181 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2182 my $job_work = $ENV{"JOB_WORK"};
2183 my $task_work = $ENV{"TASK_WORK"};
2184
2185 open(STDOUT_ORIG, ">&", STDOUT);
2186 open(STDERR_ORIG, ">&", STDERR);
2187
2188 for my $dir ($destdir, $job_work, $task_work) {
2189   if ($dir) {
2190     make_path $dir;
2191     -e $dir or die "Failed to create temporary directory ($dir): $!";
2192   }
2193 }
2194
2195 if ($task_work) {
2196   remove_tree($task_work, {keep_root => 1});
2197 }
2198
2199 ### Crunch script run mode
2200 if (@ARGV) {
2201   # We want to do routine logging during task 0 only.  This gives the user
2202   # the information they need, but avoids repeating the information for every
2203   # task.
2204   my $Log;
2205   if ($ENV{TASK_SEQUENCE} eq "0") {
2206     $Log = sub {
2207       my $msg = shift;
2208       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2209     };
2210   } else {
2211     $Log = sub { };
2212   }
2213
2214   my $python_src = "$install_dir/python";
2215   my $venv_dir = "$job_work/.arvados.venv";
2216   my $venv_built = -e "$venv_dir/bin/activate";
2217   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2218     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2219                  "--python=python2.7", $venv_dir);
2220     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2221     $venv_built = 1;
2222     $Log->("Built Python SDK virtualenv");
2223   }
2224
2225   my @pysdk_version_cmd = ("python", "-c",
2226     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2227   if ($venv_built) {
2228     $Log->("Running in Python SDK virtualenv");
2229     @pysdk_version_cmd = ();
2230     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2231     @ARGV = ("/bin/sh", "-ec",
2232              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2233   } elsif (-d $python_src) {
2234     $Log->("Warning: virtualenv not found inside Docker container default " .
2235            "\$PATH. Can't install Python SDK.");
2236   }
2237
2238   if (@pysdk_version_cmd) {
2239     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2240     my $pysdk_version = <$pysdk_version_pipe>;
2241     close($pysdk_version_pipe);
2242     if ($? == 0) {
2243       chomp($pysdk_version);
2244       $Log->("Using Arvados SDK version $pysdk_version");
2245     } else {
2246       # A lot could've gone wrong here, but pretty much all of it means that
2247       # Python won't be able to load the Arvados SDK.
2248       $Log->("Warning: Arvados SDK not found");
2249     }
2250   }
2251
2252   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2253     my $sdk_path = "$install_dir/$sdk_dir";
2254     if (-d $sdk_path) {
2255       if ($ENV{$sdk_envkey}) {
2256         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2257       } else {
2258         $ENV{$sdk_envkey} = $sdk_path;
2259       }
2260       $Log->("Arvados SDK added to %s", $sdk_envkey);
2261     }
2262   }
2263
2264   exec(@ARGV);
2265   die "Cannot exec `@ARGV`: $!";
2266 }
2267
2268 ### Installation mode
2269 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2270 flock L, LOCK_EX;
2271 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2272   # This exact git archive (source + arvados sdk) is already installed
2273   # here, so there's no need to reinstall it.
2274
2275   # We must consume our DATA section, though: otherwise the process
2276   # feeding it to us will get SIGPIPE.
2277   my $buf;
2278   while (read(DATA, $buf, 65536)) { }
2279
2280   exit(0);
2281 }
2282
2283 unlink "$destdir.archive_hash";
2284 mkdir $destdir;
2285
2286 do {
2287   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2288   local $SIG{PIPE} = "IGNORE";
2289   warn "Extracting archive: $archive_hash\n";
2290   # --ignore-zeros is necessary sometimes: depending on how much NUL
2291   # padding tar -A put on our combined archive (which in turn depends
2292   # on the length of the component archives) tar without
2293   # --ignore-zeros will exit before consuming stdin and cause close()
2294   # to fail on the resulting SIGPIPE.
2295   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2296     die "Error launching 'tar -xC $destdir': $!";
2297   }
2298   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2299   # get SIGPIPE.  We must feed it data incrementally.
2300   my $tar_input;
2301   while (read(DATA, $tar_input, 65536)) {
2302     print TARX $tar_input;
2303   }
2304   if(!close(TARX)) {
2305     die "'tar -xC $destdir' exited $?: $!";
2306   }
2307 };
2308
2309 mkdir $install_dir;
2310
2311 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2312 if (-d $sdk_root) {
2313   foreach my $sdk_lang (("python",
2314                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2315     if (-d "$sdk_root/$sdk_lang") {
2316       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2317         die "Failed to install $sdk_lang SDK: $!";
2318       }
2319     }
2320   }
2321 }
2322
2323 my $python_dir = "$install_dir/python";
2324 if ((-d $python_dir) and can_run("python2.7")) {
2325   open(my $egg_info_pipe, "-|",
2326        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2327   my @egg_info_errors = <$egg_info_pipe>;
2328   close($egg_info_pipe);
2329
2330   if ($?) {
2331     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2332       # egg_info apparently failed because it couldn't ask git for a build tag.
2333       # Specify no build tag.
2334       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2335       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2336       close($pysdk_cfg);
2337     } else {
2338       my $egg_info_exit = $? >> 8;
2339       foreach my $errline (@egg_info_errors) {
2340         warn $errline;
2341       }
2342       warn "python setup.py egg_info failed: exit $egg_info_exit";
2343       exit ($egg_info_exit || 1);
2344     }
2345   }
2346 }
2347
2348 # Hide messages from the install script (unless it fails: shell_or_die
2349 # will show $destdir.log in that case).
2350 open(STDOUT, ">>", "$destdir.log");
2351 open(STDERR, ">&", STDOUT);
2352
2353 if (-e "$destdir/crunch_scripts/install") {
2354     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2355 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2356     # Old version
2357     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2358 } elsif (-e "./install.sh") {
2359     shell_or_die (undef, "./install.sh", $install_dir);
2360 }
2361
2362 if ($archive_hash) {
2363     unlink "$destdir.archive_hash.new";
2364     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2365     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2366 }
2367
2368 close L;
2369
2370 sub can_run {
2371   my $command_name = shift;
2372   open(my $which, "-|", "which", $command_name);
2373   while (<$which>) { }
2374   close($which);
2375   return ($? == 0);
2376 }
2377
2378 sub shell_or_die
2379 {
2380   my $exitcode = shift;
2381
2382   if ($ENV{"DEBUG"}) {
2383     print STDERR "@_\n";
2384   }
2385   if (system (@_) != 0) {
2386     my $err = $!;
2387     my $code = $?;
2388     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2389     open STDERR, ">&STDERR_ORIG";
2390     system ("cat $destdir.log >&2");
2391     warn "@_ failed ($err): $exitstatus";
2392     if (defined($exitcode)) {
2393       exit $exitcode;
2394     }
2395     else {
2396       exit (($code >> 8) || 1);
2397     }
2398   }
2399 }
2400
2401 __DATA__