7868: Allow admin to add arguments to "docker run" commands.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
187      $cmd,
188      {fork => 1});
189 if ($? != 0) {
190   Log(undef, "Sanity check failed: ".exit_status_s($?));
191   exit EX_TEMPFAIL;
192 }
193 Log(undef, "Sanity check OK");
194
195
196 my $User = api_call("users/current");
197
198 if (!$local_job) {
199   if (!$force_unlock) {
200     # Claim this job, and make sure nobody else does
201     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
202     if ($@) {
203       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
204       exit EX_TEMPFAIL;
205     };
206   }
207 }
208 else
209 {
210   if (!$resume_stash)
211   {
212     map { croak ("No $_ specified") unless $local_job->{$_} }
213     qw(script script_version script_parameters);
214   }
215
216   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217   $local_job->{'started_at'} = gmtime;
218   $local_job->{'state'} = 'Running';
219
220   $Job = api_call("jobs/create", job => $local_job);
221 }
222 $job_id = $Job->{'uuid'};
223
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
226
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
230
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
232 if ($? == 0) {
233   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234   chomp($gem_versions);
235   chop($gem_versions);  # Closing parentheses
236 } else {
237   $gem_versions = "";
238 }
239 Log(undef,
240     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
241
242 Log (undef, "check slurm allocation");
243 my @slot;
244 my @node;
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
246 my @sinfo;
247 if (!$have_slurm)
248 {
249   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250   push @sinfo, "$localcpus localhost";
251 }
252 if (exists $ENV{SLURM_NODELIST})
253 {
254   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
255 }
256 foreach (@sinfo)
257 {
258   my ($ncpus, $slurm_nodelist) = split;
259   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260
261   my @nodelist;
262   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263   {
264     my $nodelist = $1;
265     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266     {
267       my $ranges = $1;
268       foreach (split (",", $ranges))
269       {
270         my ($a, $b);
271         if (/(\d+)-(\d+)/)
272         {
273           $a = $1;
274           $b = $2;
275         }
276         else
277         {
278           $a = $_;
279           $b = $_;
280         }
281         push @nodelist, map {
282           my $n = $nodelist;
283           $n =~ s/\[[-,\d]+\]/$_/;
284           $n;
285         } ($a..$b);
286       }
287     }
288     else
289     {
290       push @nodelist, $nodelist;
291     }
292   }
293   foreach my $nodename (@nodelist)
294   {
295     Log (undef, "node $nodename - $ncpus slots");
296     my $node = { name => $nodename,
297                  ncpus => $ncpus,
298                  # The number of consecutive times a task has been dispatched
299                  # to this node and failed.
300                  losing_streak => 0,
301                  # The number of consecutive times that SLURM has reported
302                  # a node failure since the last successful task.
303                  fail_count => 0,
304                  # Don't dispatch work to this node until this time
305                  # (in seconds since the epoch) has passed.
306                  hold_until => 0 };
307     foreach my $cpu (1..$ncpus)
308     {
309       push @slot, { node => $node,
310                     cpu => $cpu };
311     }
312   }
313   push @node, @nodelist;
314 }
315
316
317
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
320
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322
323
324 $Job->update_attributes(
325   'tasks_summary' => { 'failed' => 0,
326                        'todo' => 1,
327                        'running' => 0,
328                        'done' => 0 });
329
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
338
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
344
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
348
349
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
356
357
358
359 if (defined $Job->{thawedfromkey})
360 {
361   thaw ($Job->{thawedfromkey});
362 }
363 else
364 {
365   my $first_task = api_call("job_tasks/create", job_task => {
366     'job_uuid' => $Job->{'uuid'},
367     'sequence' => 0,
368     'qsequence' => 0,
369     'parameters' => {},
370   });
371   push @jobstep, { 'level' => 0,
372                    'failures' => 0,
373                    'arvados_task' => $first_task,
374                  };
375   push @jobstep_todo, 0;
376 }
377
378
379 if (!$have_slurm)
380 {
381   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 }
383
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
387
388 if (!defined $no_clear_tmp) {
389   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390   Log (undef, "Clean work dirs");
391
392   my $cleanpid = fork();
393   if ($cleanpid == 0)
394   {
395     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396     # Then clean up work directories.
397     # TODO: When #5036 is done and widely deployed, we can limit mount's
398     # -t option to simply fuse.keep.
399     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
401     exit (1);
402   }
403   while (1)
404   {
405     last if $cleanpid == waitpid (-1, WNOHANG);
406     freeze_if_want_freeze ($cleanpid);
407     select (undef, undef, undef, 0.1);
408   }
409   if ($?) {
410     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419   if (!$docker_hash)
420   {
421     croak("No Docker image hash found from locator $docker_locator");
422   }
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
426     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
427 fi
428 };
429   my $docker_pid = fork();
430   if ($docker_pid == 0)
431   {
432     srun (["srun", "--nodelist=" . join(',', @node)],
433           ["/bin/sh", "-ec", $docker_install_script]);
434     exit ($?);
435   }
436   while (1)
437   {
438     last if $docker_pid == waitpid (-1, WNOHANG);
439     freeze_if_want_freeze ($docker_pid);
440     select (undef, undef, undef, 0.1);
441   }
442   if ($? != 0)
443   {
444     croak("Installing Docker image from $docker_locator exited "
445           .exit_status_s($?));
446   }
447
448   # Determine whether this version of Docker supports memory+swap limits.
449   srun(["srun", "--nodelist=" . $node[0]],
450        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
451       {fork => 1});
452   $docker_limitmem = ($? == 0);
453
454   # Find a non-root Docker user to use.
455   # Tries the default user for the container, then 'crunch', then 'nobody',
456   # testing for whether the actual user id is non-zero.  This defends against
457   # mistakes but not malice, but we intend to harden the security in the future
458   # so we don't want anyone getting used to their jobs running as root in their
459   # Docker containers.
460   my @tryusers = ("", "crunch", "nobody");
461   foreach my $try_user (@tryusers) {
462     my $try_user_arg;
463     if ($try_user eq "") {
464       Log(undef, "Checking if container default user is not UID 0");
465       $try_user_arg = "";
466     } else {
467       Log(undef, "Checking if user '$try_user' is not UID 0");
468       $try_user_arg = "--user=$try_user";
469     }
470     srun(["srun", "--nodelist=" . $node[0]],
471          ["/bin/sh", "-ec",
472           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
473           " test \$a -ne 0"],
474          {fork => 1});
475     if ($? == 0) {
476       $dockeruserarg = $try_user_arg;
477       if ($try_user eq "") {
478         Log(undef, "Container will run with default user");
479       } else {
480         Log(undef, "Container will run with $dockeruserarg");
481       }
482       last;
483     }
484   }
485
486   if (!defined $dockeruserarg) {
487     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
488   }
489
490   if ($Job->{arvados_sdk_version}) {
491     # The job also specifies an Arvados SDK version.  Add the SDKs to the
492     # tar file for the build script to install.
493     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
494                        $Job->{arvados_sdk_version}));
495     add_git_archive("git", "--git-dir=$git_dir", "archive",
496                     "--prefix=.arvados.sdk/",
497                     $Job->{arvados_sdk_version}, "sdk");
498   }
499 }
500
501 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
502   # If script_version looks like an absolute path, *and* the --git-dir
503   # argument was not given -- which implies we were not invoked by
504   # crunch-dispatch -- we will use the given path as a working
505   # directory instead of resolving script_version to a git commit (or
506   # doing anything else with git).
507   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
508   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
509 }
510 else {
511   # Resolve the given script_version to a git commit sha1. Also, if
512   # the repository is remote, clone it into our local filesystem: this
513   # ensures "git archive" will work, and is necessary to reliably
514   # resolve a symbolic script_version like "master^".
515   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
516
517   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
520
521   # If we're running under crunch-dispatch, it will have already
522   # pulled the appropriate source tree into its own repository, and
523   # given us that repo's path as $git_dir.
524   #
525   # If we're running a "local" job, we might have to fetch content
526   # from a remote repository.
527   #
528   # (Currently crunch-dispatch gives a local path with --git-dir, but
529   # we might as well accept URLs there too in case it changes its
530   # mind.)
531   my $repo = $git_dir || $Job->{'repository'};
532
533   # Repository can be remote or local. If remote, we'll need to fetch it
534   # to a local dir before doing `git log` et al.
535   my $repo_location;
536
537   if ($repo =~ m{://|^[^/]*:}) {
538     # $repo is a git url we can clone, like git:// or https:// or
539     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
540     # not recognized here because distinguishing that from a local
541     # path is too fragile. If you really need something strange here,
542     # use the ssh:// form.
543     $repo_location = 'remote';
544   } elsif ($repo =~ m{^\.*/}) {
545     # $repo is a local path to a git index. We'll also resolve ../foo
546     # to ../foo/.git if the latter is a directory. To help
547     # disambiguate local paths from named hosted repositories, this
548     # form must be given as ./ or ../ if it's a relative path.
549     if (-d "$repo/.git") {
550       $repo = "$repo/.git";
551     }
552     $repo_location = 'local';
553   } else {
554     # $repo is none of the above. It must be the name of a hosted
555     # repository.
556     my $arv_repo_list = api_call("repositories/list",
557                                  'filters' => [['name','=',$repo]]);
558     my @repos_found = @{$arv_repo_list->{'items'}};
559     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
560     if ($n_found > 0) {
561       Log(undef, "Repository '$repo' -> "
562           . join(", ", map { $_->{'uuid'} } @repos_found));
563     }
564     if ($n_found != 1) {
565       croak("Error: Found $n_found repositories with name '$repo'.");
566     }
567     $repo = $repos_found[0]->{'fetch_url'};
568     $repo_location = 'remote';
569   }
570   Log(undef, "Using $repo_location repository '$repo'");
571   $ENV{"CRUNCH_SRC_URL"} = $repo;
572
573   # Resolve given script_version (we'll call that $treeish here) to a
574   # commit sha1 ($commit).
575   my $treeish = $Job->{'script_version'};
576   my $commit;
577   if ($repo_location eq 'remote') {
578     # We minimize excess object-fetching by re-using the same bare
579     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
580     # just keep adding remotes to it as needed.
581     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
582     my $gitcmd = "git --git-dir=\Q$local_repo\E";
583
584     # Set up our local repo for caching remote objects, making
585     # archives, etc.
586     if (!-d $local_repo) {
587       make_path($local_repo) or croak("Error: could not create $local_repo");
588     }
589     # This works (exits 0 and doesn't delete fetched objects) even
590     # if $local_repo is already initialized:
591     `$gitcmd init --bare`;
592     if ($?) {
593       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
594     }
595
596     # If $treeish looks like a hash (or abbrev hash) we look it up in
597     # our local cache first, since that's cheaper. (We don't want to
598     # do that with tags/branches though -- those change over time, so
599     # they should always be resolved by the remote repo.)
600     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
601       # Hide stderr because it's normal for this to fail:
602       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
603       if ($? == 0 &&
604           # Careful not to resolve a branch named abcdeff to commit 1234567:
605           $sha1 =~ /^$treeish/ &&
606           $sha1 =~ /^([0-9a-f]{40})$/s) {
607         $commit = $1;
608         Log(undef, "Commit $commit already present in $local_repo");
609       }
610     }
611
612     if (!defined $commit) {
613       # If $treeish isn't just a hash or abbrev hash, or isn't here
614       # yet, we need to fetch the remote to resolve it correctly.
615
616       # First, remove all local heads. This prevents a name that does
617       # not exist on the remote from resolving to (or colliding with)
618       # a previously fetched branch or tag (possibly from a different
619       # remote).
620       remove_tree("$local_repo/refs/heads", {keep_root => 1});
621
622       Log(undef, "Fetching objects from $repo to $local_repo");
623       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
624       if ($?) {
625         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626       }
627     }
628
629     # Now that the data is all here, we will use our local repo for
630     # the rest of our git activities.
631     $repo = $local_repo;
632   }
633
634   my $gitcmd = "git --git-dir=\Q$repo\E";
635   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
636   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
637     croak("`$gitcmd rev-list` exited "
638           .exit_status_s($?)
639           .", '$treeish' not found, giving up");
640   }
641   $commit = $1;
642   Log(undef, "Version $treeish is commit $commit");
643
644   if ($commit ne $Job->{'script_version'}) {
645     # Record the real commit id in the database, frozentokey, logs,
646     # etc. -- instead of an abbreviation or a branch name which can
647     # become ambiguous or point to a different commit in the future.
648     if (!$Job->update_attributes('script_version' => $commit)) {
649       croak("Error: failed to update job's script_version attribute");
650     }
651   }
652
653   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
654   add_git_archive("$gitcmd archive ''\Q$commit\E");
655 }
656
657 my $git_archive = combined_git_archive();
658 if (!defined $git_archive) {
659   Log(undef, "Skip install phase (no git archive)");
660   if ($have_slurm) {
661     Log(undef, "Warning: This probably means workers have no source tree!");
662   }
663 }
664 else {
665   my $install_exited;
666   my $install_script_tries_left = 3;
667   for (my $attempts = 0; $attempts < 3; $attempts++) {
668     Log(undef, "Run install script on all workers");
669
670     my @srunargs = ("srun",
671                     "--nodelist=$nodelist",
672                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
673     my @execargs = ("sh", "-c",
674                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
675
676     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
677     my ($install_stderr_r, $install_stderr_w);
678     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
679     set_nonblocking($install_stderr_r);
680     my $installpid = fork();
681     if ($installpid == 0)
682     {
683       close($install_stderr_r);
684       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
685       open(STDOUT, ">&", $install_stderr_w);
686       open(STDERR, ">&", $install_stderr_w);
687       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
688       exit (1);
689     }
690     close($install_stderr_w);
691     # Tell freeze_if_want_freeze how to kill the child, otherwise the
692     # "waitpid(installpid)" loop won't get interrupted by a freeze:
693     $proc{$installpid} = {};
694     my $stderr_buf = '';
695     # Track whether anything appears on stderr other than slurm errors
696     # ("srun: ...") and the "starting: ..." message printed by the
697     # srun subroutine itself:
698     my $stderr_anything_from_script = 0;
699     my $match_our_own_errors = '^(srun: error: |starting: \[)';
700     while ($installpid != waitpid(-1, WNOHANG)) {
701       freeze_if_want_freeze ($installpid);
702       # Wait up to 0.1 seconds for something to appear on stderr, then
703       # do a non-blocking read.
704       my $bits = fhbits($install_stderr_r);
705       select ($bits, undef, $bits, 0.1);
706       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
707       {
708         while ($stderr_buf =~ /^(.*?)\n/) {
709           my $line = $1;
710           substr $stderr_buf, 0, 1+length($line), "";
711           Log(undef, "stderr $line");
712           if ($line !~ /$match_our_own_errors/) {
713             $stderr_anything_from_script = 1;
714           }
715         }
716       }
717     }
718     delete $proc{$installpid};
719     $install_exited = $?;
720     close($install_stderr_r);
721     if (length($stderr_buf) > 0) {
722       if ($stderr_buf !~ /$match_our_own_errors/) {
723         $stderr_anything_from_script = 1;
724       }
725       Log(undef, "stderr $stderr_buf")
726     }
727
728     Log (undef, "Install script exited ".exit_status_s($install_exited));
729     last if $install_exited == 0 || $main::please_freeze;
730     # If the install script fails but doesn't print an error message,
731     # the next thing anyone is likely to do is just run it again in
732     # case it was a transient problem like "slurm communication fails
733     # because the network isn't reliable enough". So we'll just do
734     # that ourselves (up to 3 attempts in total). OTOH, if there is an
735     # error message, the problem is more likely to have a real fix and
736     # we should fail the job so the fixing process can start, instead
737     # of doing 2 more attempts.
738     last if $stderr_anything_from_script;
739   }
740
741   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
742     unlink($tar_filename);
743   }
744
745   if ($install_exited != 0) {
746     croak("Giving up");
747   }
748 }
749
750 foreach (qw (script script_version script_parameters runtime_constraints))
751 {
752   Log (undef,
753        "$_ " .
754        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
755 }
756 foreach (split (/\n/, $Job->{knobs}))
757 {
758   Log (undef, "knob " . $_);
759 }
760
761
762
763 $main::success = undef;
764
765
766
767 ONELEVEL:
768
769 my $thisround_succeeded = 0;
770 my $thisround_failed = 0;
771 my $thisround_failed_multiple = 0;
772 my $working_slot_count = scalar(@slot);
773
774 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
775                        or $a <=> $b } @jobstep_todo;
776 my $level = $jobstep[$jobstep_todo[0]]->{level};
777
778 my $initial_tasks_this_level = 0;
779 foreach my $id (@jobstep_todo) {
780   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
781 }
782
783 # If the number of tasks scheduled at this level #T is smaller than the number
784 # of slots available #S, only use the first #T slots, or the first slot on
785 # each node, whichever number is greater.
786 #
787 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
788 # based on these numbers.  Using fewer slots makes more resources available
789 # to each individual task, which should normally be a better strategy when
790 # there are fewer of them running with less parallelism.
791 #
792 # Note that this calculation is not redone if the initial tasks at
793 # this level queue more tasks at the same level.  This may harm
794 # overall task throughput for that level.
795 my @freeslot;
796 if ($initial_tasks_this_level < @node) {
797   @freeslot = (0..$#node);
798 } elsif ($initial_tasks_this_level < @slot) {
799   @freeslot = (0..$initial_tasks_this_level - 1);
800 } else {
801   @freeslot = (0..$#slot);
802 }
803 my $round_num_freeslots = scalar(@freeslot);
804
805 my %round_max_slots = ();
806 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
807   my $this_slot = $slot[$freeslot[$ii]];
808   my $node_name = $this_slot->{node}->{name};
809   $round_max_slots{$node_name} ||= $this_slot->{cpu};
810   last if (scalar(keys(%round_max_slots)) >= @node);
811 }
812
813 Log(undef, "start level $level with $round_num_freeslots slots");
814 my @holdslot;
815 my %reader;
816 my $progress_is_dirty = 1;
817 my $progress_stats_updated = 0;
818
819 update_progress_stats();
820
821
822 THISROUND:
823 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
824 {
825   # Don't create new tasks if we already know the job's final result.
826   last if defined($main::success);
827
828   my $id = $jobstep_todo[$todo_ptr];
829   my $Jobstep = $jobstep[$id];
830   if ($Jobstep->{level} != $level)
831   {
832     next;
833   }
834
835   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
836   set_nonblocking($reader{$id});
837
838   my $childslot = $freeslot[0];
839   my $childnode = $slot[$childslot]->{node};
840   my $childslotname = join (".",
841                             $slot[$childslot]->{node}->{name},
842                             $slot[$childslot]->{cpu});
843
844   my $childpid = fork();
845   if ($childpid == 0)
846   {
847     $SIG{'INT'} = 'DEFAULT';
848     $SIG{'QUIT'} = 'DEFAULT';
849     $SIG{'TERM'} = 'DEFAULT';
850
851     foreach (values (%reader))
852     {
853       close($_);
854     }
855     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
856     open(STDOUT,">&writer");
857     open(STDERR,">&writer");
858
859     undef $dbh;
860     undef $sth;
861
862     delete $ENV{"GNUPGHOME"};
863     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
864     $ENV{"TASK_QSEQUENCE"} = $id;
865     $ENV{"TASK_SEQUENCE"} = $level;
866     $ENV{"JOB_SCRIPT"} = $Job->{script};
867     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
868       $param =~ tr/a-z/A-Z/;
869       $ENV{"JOB_PARAMETER_$param"} = $value;
870     }
871     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
872     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
873     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
874     $ENV{"HOME"} = $ENV{"TASK_WORK"};
875     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
876     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
877     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
878     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
879
880     $ENV{"GZIP"} = "-n";
881
882     my @srunargs = (
883       "srun",
884       "--nodelist=".$childnode->{name},
885       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
886       "--job-name=$job_id.$id.$$",
887         );
888
889     my $stdbuf = " stdbuf --output=0 --error=0 ";
890
891     my $arv_file_cache = "";
892     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
893       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
894     }
895
896     my $command =
897         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
898         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
899         ."&& cd $ENV{CRUNCH_TMP} "
900         # These environment variables get used explicitly later in
901         # $command.  No tool is expected to read these values directly.
902         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
903         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
904         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
905         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
906     $command .= "&& exec arv-mount --by-pdh --crunchstat-interval=10 --allow-other $arv_file_cache $ENV{TASK_KEEPMOUNT} --exec ";
907     if ($docker_hash)
908     {
909       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
910       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
911       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
912       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
913       # We only set memory limits if Docker lets us limit both memory and swap.
914       # Memory limits alone have been supported longer, but subprocesses tend
915       # to get SIGKILL if they exceed that without any swap limit set.
916       # See #5642 for additional background.
917       if ($docker_limitmem) {
918         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
919       }
920
921       # The source tree and $destdir directory (which we have
922       # installed on the worker host) are available in the container,
923       # under the same path.
924       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
925       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
926
927       # Currently, we make arv-mount's mount point appear at /keep
928       # inside the container (instead of using the same path as the
929       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
930       # crunch scripts and utilities must not rely on this. They must
931       # use $TASK_KEEPMOUNT.
932       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
933       $ENV{TASK_KEEPMOUNT} = "/keep";
934
935       # TASK_WORK is almost exactly like a docker data volume: it
936       # starts out empty, is writable, and persists until no
937       # containers use it any more. We don't use --volumes-from to
938       # share it with other containers: it is only accessible to this
939       # task, and it goes away when this task stops.
940       #
941       # However, a docker data volume is writable only by root unless
942       # the mount point already happens to exist in the container with
943       # different permissions. Therefore, we [1] assume /tmp already
944       # exists in the image and is writable by the crunch user; [2]
945       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
946       # writable if they are created by docker while setting up the
947       # other --volumes); and [3] create $TASK_WORK inside the
948       # container using $build_script.
949       $command .= "--volume=/tmp ";
950       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
951       $ENV{"HOME"} = $ENV{"TASK_WORK"};
952       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
953
954       # TODO: Share a single JOB_WORK volume across all task
955       # containers on a given worker node, and delete it when the job
956       # ends (and, in case that doesn't work, when the next job
957       # starts).
958       #
959       # For now, use the same approach as TASK_WORK above.
960       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
961
962       while (my ($env_key, $env_val) = each %ENV)
963       {
964         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
965           $command .= "--env=\Q$env_key=$env_val\E ";
966         }
967       }
968       $command .= "--env=\QHOME=$ENV{HOME}\E ";
969       $command .= "\Q$docker_hash\E ";
970
971       if ($Job->{arvados_sdk_version}) {
972         $command .= $stdbuf;
973         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
974       } else {
975         $command .= "/bin/sh -c \'python -c " .
976             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
977             ">&2 2>/dev/null; " .
978             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
979             "if which stdbuf >/dev/null ; then " .
980             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
981             " else " .
982             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
983             " fi\'";
984       }
985     } else {
986       # Non-docker run
987       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
988       $command .= $stdbuf;
989       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
990     }
991
992     my @execargs = ('bash', '-c', $command);
993     srun (\@srunargs, \@execargs, undef, $build_script);
994     # exec() failed, we assume nothing happened.
995     die "srun() failed on build script\n";
996   }
997   close("writer");
998   if (!defined $childpid)
999   {
1000     close $reader{$id};
1001     delete $reader{$id};
1002     next;
1003   }
1004   shift @freeslot;
1005   $proc{$childpid} = { jobstep => $id,
1006                        time => time,
1007                        slot => $childslot,
1008                        jobstepname => "$job_id.$id.$childpid",
1009                      };
1010   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1011   $slot[$childslot]->{pid} = $childpid;
1012
1013   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1014   Log ($id, "child $childpid started on $childslotname");
1015   $Jobstep->{starttime} = time;
1016   $Jobstep->{node} = $childnode->{name};
1017   $Jobstep->{slotindex} = $childslot;
1018   delete $Jobstep->{stderr};
1019   delete $Jobstep->{finishtime};
1020   delete $Jobstep->{tempfail};
1021
1022   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1023   $Jobstep->{'arvados_task'}->save;
1024
1025   splice @jobstep_todo, $todo_ptr, 1;
1026   --$todo_ptr;
1027
1028   $progress_is_dirty = 1;
1029
1030   while (!@freeslot
1031          ||
1032          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1033   {
1034     last THISROUND if $main::please_freeze;
1035     if ($main::please_info)
1036     {
1037       $main::please_info = 0;
1038       freeze();
1039       create_output_collection();
1040       save_meta(1);
1041       update_progress_stats();
1042     }
1043     my $gotsome
1044         = readfrompipes ()
1045         + reapchildren ();
1046     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1047     {
1048       check_refresh_wanted();
1049       check_squeue();
1050       update_progress_stats();
1051       select (undef, undef, undef, 0.1);
1052     }
1053     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1054     {
1055       update_progress_stats();
1056     }
1057     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1058                                         $_->{node}->{hold_count} < 4 } @slot);
1059     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1060         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1061     {
1062       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1063           .($thisround_failed+$thisround_succeeded)
1064           .") -- giving up on this round";
1065       Log (undef, $message);
1066       last THISROUND;
1067     }
1068
1069     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1070     for (my $i=$#freeslot; $i>=0; $i--) {
1071       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1072         push @holdslot, (splice @freeslot, $i, 1);
1073       }
1074     }
1075     for (my $i=$#holdslot; $i>=0; $i--) {
1076       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1077         push @freeslot, (splice @holdslot, $i, 1);
1078       }
1079     }
1080
1081     # give up if no nodes are succeeding
1082     if ($working_slot_count < 1) {
1083       Log(undef, "Every node has failed -- giving up");
1084       last THISROUND;
1085     }
1086   }
1087 }
1088
1089
1090 push @freeslot, splice @holdslot;
1091 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1092
1093
1094 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1095 while (%proc)
1096 {
1097   if ($main::please_continue) {
1098     $main::please_continue = 0;
1099     goto THISROUND;
1100   }
1101   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1102   readfrompipes ();
1103   if (!reapchildren())
1104   {
1105     check_refresh_wanted();
1106     check_squeue();
1107     update_progress_stats();
1108     select (undef, undef, undef, 0.1);
1109     killem (keys %proc) if $main::please_freeze;
1110   }
1111 }
1112
1113 update_progress_stats();
1114 freeze_if_want_freeze();
1115
1116
1117 if (!defined $main::success)
1118 {
1119   if (!@jobstep_todo) {
1120     $main::success = 1;
1121   } elsif ($working_slot_count < 1) {
1122     save_output_collection();
1123     save_meta();
1124     exit(EX_RETRY_UNLOCKED);
1125   } elsif ($thisround_succeeded == 0 &&
1126            ($thisround_failed == 0 || $thisround_failed > 4)) {
1127     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1128     Log (undef, $message);
1129     $main::success = 0;
1130   }
1131 }
1132
1133 goto ONELEVEL if !defined $main::success;
1134
1135
1136 release_allocation();
1137 freeze();
1138 my $collated_output = save_output_collection();
1139 Log (undef, "finish");
1140
1141 save_meta();
1142
1143 my $final_state;
1144 if ($collated_output && $main::success) {
1145   $final_state = 'Complete';
1146 } else {
1147   $final_state = 'Failed';
1148 }
1149 $Job->update_attributes('state' => $final_state);
1150
1151 exit (($final_state eq 'Complete') ? 0 : 1);
1152
1153
1154
1155 sub update_progress_stats
1156 {
1157   $progress_stats_updated = time;
1158   return if !$progress_is_dirty;
1159   my ($todo, $done, $running) = (scalar @jobstep_todo,
1160                                  scalar @jobstep_done,
1161                                  scalar keys(%proc));
1162   $Job->{'tasks_summary'} ||= {};
1163   $Job->{'tasks_summary'}->{'todo'} = $todo;
1164   $Job->{'tasks_summary'}->{'done'} = $done;
1165   $Job->{'tasks_summary'}->{'running'} = $running;
1166   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1167   Log (undef, "status: $done done, $running running, $todo todo");
1168   $progress_is_dirty = 0;
1169 }
1170
1171
1172
1173 sub reapchildren
1174 {
1175   my $pid = waitpid (-1, WNOHANG);
1176   return 0 if $pid <= 0;
1177
1178   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1179                   . "."
1180                   . $slot[$proc{$pid}->{slot}]->{cpu});
1181   my $jobstepid = $proc{$pid}->{jobstep};
1182   my $elapsed = time - $proc{$pid}->{time};
1183   my $Jobstep = $jobstep[$jobstepid];
1184
1185   my $childstatus = $?;
1186   my $exitvalue = $childstatus >> 8;
1187   my $exitinfo = "exit ".exit_status_s($childstatus);
1188   $Jobstep->{'arvados_task'}->reload;
1189   my $task_success = $Jobstep->{'arvados_task'}->{success};
1190
1191   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1192
1193   if (!defined $task_success) {
1194     # task did not indicate one way or the other --> fail
1195     Log($jobstepid, sprintf(
1196           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1197           exit_status_s($childstatus)));
1198     $Jobstep->{'arvados_task'}->{success} = 0;
1199     $Jobstep->{'arvados_task'}->save;
1200     $task_success = 0;
1201   }
1202
1203   if (!$task_success)
1204   {
1205     my $temporary_fail;
1206     $temporary_fail ||= $Jobstep->{tempfail};
1207     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1208
1209     ++$thisround_failed;
1210     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1211
1212     # Check for signs of a failed or misconfigured node
1213     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1214         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1215       # Don't count this against jobstep failure thresholds if this
1216       # node is already suspected faulty and srun exited quickly
1217       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1218           $elapsed < 5) {
1219         Log ($jobstepid, "blaming failure on suspect node " .
1220              $slot[$proc{$pid}->{slot}]->{node}->{name});
1221         $temporary_fail ||= 1;
1222       }
1223       ban_node_by_slot($proc{$pid}->{slot});
1224     }
1225
1226     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1227                              ++$Jobstep->{'failures'},
1228                              $temporary_fail ? 'temporary' : 'permanent',
1229                              $elapsed));
1230
1231     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1232       # Give up on this task, and the whole job
1233       $main::success = 0;
1234     }
1235     # Put this task back on the todo queue
1236     push @jobstep_todo, $jobstepid;
1237     $Job->{'tasks_summary'}->{'failed'}++;
1238   }
1239   else
1240   {
1241     ++$thisround_succeeded;
1242     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1243     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1244     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1245     push @jobstep_done, $jobstepid;
1246     Log ($jobstepid, "success in $elapsed seconds");
1247   }
1248   $Jobstep->{exitcode} = $childstatus;
1249   $Jobstep->{finishtime} = time;
1250   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1251   $Jobstep->{'arvados_task'}->save;
1252   process_stderr ($jobstepid, $task_success);
1253   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1254                            length($Jobstep->{'arvados_task'}->{output}),
1255                            $Jobstep->{'arvados_task'}->{output}));
1256
1257   close $reader{$jobstepid};
1258   delete $reader{$jobstepid};
1259   delete $slot[$proc{$pid}->{slot}]->{pid};
1260   push @freeslot, $proc{$pid}->{slot};
1261   delete $proc{$pid};
1262
1263   if ($task_success) {
1264     # Load new tasks
1265     my $newtask_list = [];
1266     my $newtask_results;
1267     do {
1268       $newtask_results = api_call(
1269         "job_tasks/list",
1270         'where' => {
1271           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1272         },
1273         'order' => 'qsequence',
1274         'offset' => scalar(@$newtask_list),
1275       );
1276       push(@$newtask_list, @{$newtask_results->{items}});
1277     } while (@{$newtask_results->{items}});
1278     foreach my $arvados_task (@$newtask_list) {
1279       my $jobstep = {
1280         'level' => $arvados_task->{'sequence'},
1281         'failures' => 0,
1282         'arvados_task' => $arvados_task
1283       };
1284       push @jobstep, $jobstep;
1285       push @jobstep_todo, $#jobstep;
1286     }
1287   }
1288
1289   $progress_is_dirty = 1;
1290   1;
1291 }
1292
1293 sub check_refresh_wanted
1294 {
1295   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1296   if (@stat && $stat[9] > $latest_refresh) {
1297     $latest_refresh = scalar time;
1298     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1299     for my $attr ('cancelled_at',
1300                   'cancelled_by_user_uuid',
1301                   'cancelled_by_client_uuid',
1302                   'state') {
1303       $Job->{$attr} = $Job2->{$attr};
1304     }
1305     if ($Job->{'state'} ne "Running") {
1306       if ($Job->{'state'} eq "Cancelled") {
1307         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1308       } else {
1309         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1310       }
1311       $main::success = 0;
1312       $main::please_freeze = 1;
1313     }
1314   }
1315 }
1316
1317 sub check_squeue
1318 {
1319   my $last_squeue_check = $squeue_checked;
1320
1321   # Do not call `squeue` or check the kill list more than once every
1322   # 15 seconds.
1323   return if $last_squeue_check > time - 15;
1324   $squeue_checked = time;
1325
1326   # Look for children from which we haven't received stderr data since
1327   # the last squeue check. If no such children exist, all procs are
1328   # alive and there's no need to even look at squeue.
1329   #
1330   # As long as the crunchstat poll interval (10s) is shorter than the
1331   # squeue check interval (15s) this should make the squeue check an
1332   # infrequent event.
1333   my $silent_procs = 0;
1334   for my $jobstep (values %proc)
1335   {
1336     if ($jobstep->{stderr_at} < $last_squeue_check)
1337     {
1338       $silent_procs++;
1339     }
1340   }
1341   return if $silent_procs == 0;
1342
1343   # use killem() on procs whose killtime is reached
1344   while (my ($pid, $jobstep) = each %proc)
1345   {
1346     if (exists $jobstep->{killtime}
1347         && $jobstep->{killtime} <= time
1348         && $jobstep->{stderr_at} < $last_squeue_check)
1349     {
1350       my $sincewhen = "";
1351       if ($jobstep->{stderr_at}) {
1352         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1353       }
1354       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1355       killem ($pid);
1356     }
1357   }
1358
1359   if (!$have_slurm)
1360   {
1361     # here is an opportunity to check for mysterious problems with local procs
1362     return;
1363   }
1364
1365   # Get a list of steps still running.  Note: squeue(1) says --steps
1366   # selects a format (which we override anyway) and allows us to
1367   # specify which steps we're interested in (which we don't).
1368   # Importantly, it also changes the meaning of %j from "job name" to
1369   # "step name" and (although this isn't mentioned explicitly in the
1370   # docs) switches from "one line per job" mode to "one line per step"
1371   # mode. Without it, we'd just get a list of one job, instead of a
1372   # list of N steps.
1373   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1374   if ($? != 0)
1375   {
1376     Log(undef, "warning: squeue exit status $? ($!)");
1377     return;
1378   }
1379   chop @squeue;
1380
1381   # which of my jobsteps are running, according to squeue?
1382   my %ok;
1383   for my $jobstepname (@squeue)
1384   {
1385     $ok{$jobstepname} = 1;
1386   }
1387
1388   # Check for child procs >60s old and not mentioned by squeue.
1389   while (my ($pid, $jobstep) = each %proc)
1390   {
1391     if ($jobstep->{time} < time - 60
1392         && $jobstep->{jobstepname}
1393         && !exists $ok{$jobstep->{jobstepname}}
1394         && !exists $jobstep->{killtime})
1395     {
1396       # According to slurm, this task has ended (successfully or not)
1397       # -- but our srun child hasn't exited. First we must wait (30
1398       # seconds) in case this is just a race between communication
1399       # channels. Then, if our srun child process still hasn't
1400       # terminated, we'll conclude some slurm communication
1401       # error/delay has caused the task to die without notifying srun,
1402       # and we'll kill srun ourselves.
1403       $jobstep->{killtime} = time + 30;
1404       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1405     }
1406   }
1407 }
1408
1409
1410 sub release_allocation
1411 {
1412   if ($have_slurm)
1413   {
1414     Log (undef, "release job allocation");
1415     system "scancel $ENV{SLURM_JOB_ID}";
1416   }
1417 }
1418
1419
1420 sub readfrompipes
1421 {
1422   my $gotsome = 0;
1423   foreach my $job (keys %reader)
1424   {
1425     my $buf;
1426     while (0 < sysread ($reader{$job}, $buf, 8192))
1427     {
1428       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1429       $jobstep[$job]->{stderr_at} = time;
1430       $jobstep[$job]->{stderr} .= $buf;
1431       preprocess_stderr ($job);
1432       if (length ($jobstep[$job]->{stderr}) > 16384)
1433       {
1434         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1435       }
1436       $gotsome = 1;
1437     }
1438   }
1439   return $gotsome;
1440 }
1441
1442
1443 sub preprocess_stderr
1444 {
1445   my $job = shift;
1446
1447   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1448     my $line = $1;
1449     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1450     Log ($job, "stderr $line");
1451     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1452       # whoa.
1453       $main::please_freeze = 1;
1454     }
1455     elsif ($line =~ /srun: error: Node failure on/) {
1456       my $job_slot_index = $jobstep[$job]->{slotindex};
1457       $slot[$job_slot_index]->{node}->{fail_count}++;
1458       $jobstep[$job]->{tempfail} = 1;
1459       ban_node_by_slot($job_slot_index);
1460     }
1461     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1462       $jobstep[$job]->{tempfail} = 1;
1463       ban_node_by_slot($jobstep[$job]->{slotindex});
1464     }
1465     elsif ($line =~ /arvados\.errors\.Keep/) {
1466       $jobstep[$job]->{tempfail} = 1;
1467     }
1468   }
1469 }
1470
1471
1472 sub process_stderr
1473 {
1474   my $job = shift;
1475   my $task_success = shift;
1476   preprocess_stderr ($job);
1477
1478   map {
1479     Log ($job, "stderr $_");
1480   } split ("\n", $jobstep[$job]->{stderr});
1481 }
1482
1483 sub fetch_block
1484 {
1485   my $hash = shift;
1486   my $keep;
1487   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1488     Log(undef, "fetch_block run error from arv-get $hash: $!");
1489     return undef;
1490   }
1491   my $output_block = "";
1492   while (1) {
1493     my $buf;
1494     my $bytes = sysread($keep, $buf, 1024 * 1024);
1495     if (!defined $bytes) {
1496       Log(undef, "fetch_block read error from arv-get: $!");
1497       $output_block = undef;
1498       last;
1499     } elsif ($bytes == 0) {
1500       # sysread returns 0 at the end of the pipe.
1501       last;
1502     } else {
1503       # some bytes were read into buf.
1504       $output_block .= $buf;
1505     }
1506   }
1507   close $keep;
1508   if ($?) {
1509     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1510     $output_block = undef;
1511   }
1512   return $output_block;
1513 }
1514
1515 # Create a collection by concatenating the output of all tasks (each
1516 # task's output is either a manifest fragment, a locator for a
1517 # manifest fragment stored in Keep, or nothing at all). Return the
1518 # portable_data_hash of the new collection.
1519 sub create_output_collection
1520 {
1521   Log (undef, "collate");
1522
1523   my ($child_out, $child_in);
1524   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1525 import arvados
1526 import sys
1527 print (arvados.api("v1").collections().
1528        create(body={"manifest_text": sys.stdin.read()}).
1529        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1530 }, retry_count());
1531
1532   my $task_idx = -1;
1533   my $manifest_size = 0;
1534   for (@jobstep)
1535   {
1536     ++$task_idx;
1537     my $output = $_->{'arvados_task'}->{output};
1538     next if (!defined($output));
1539     my $next_write;
1540     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1541       $next_write = fetch_block($output);
1542     } else {
1543       $next_write = $output;
1544     }
1545     if (defined($next_write)) {
1546       if (!defined(syswrite($child_in, $next_write))) {
1547         # There's been an error writing.  Stop the loop.
1548         # We'll log details about the exit code later.
1549         last;
1550       } else {
1551         $manifest_size += length($next_write);
1552       }
1553     } else {
1554       my $uuid = $_->{'arvados_task'}->{'uuid'};
1555       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1556       $main::success = 0;
1557     }
1558   }
1559   close($child_in);
1560   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1561
1562   my $joboutput;
1563   my $s = IO::Select->new($child_out);
1564   if ($s->can_read(120)) {
1565     sysread($child_out, $joboutput, 1024 * 1024);
1566     waitpid($pid, 0);
1567     if ($?) {
1568       Log(undef, "output collection creation exited " . exit_status_s($?));
1569       $joboutput = undef;
1570     } else {
1571       chomp($joboutput);
1572     }
1573   } else {
1574     Log (undef, "timed out while creating output collection");
1575     foreach my $signal (2, 2, 2, 15, 15, 9) {
1576       kill($signal, $pid);
1577       last if waitpid($pid, WNOHANG) == -1;
1578       sleep(1);
1579     }
1580   }
1581   close($child_out);
1582
1583   return $joboutput;
1584 }
1585
1586 # Calls create_output_collection, logs the result, and returns it.
1587 # If that was successful, save that as the output in the job record.
1588 sub save_output_collection {
1589   my $collated_output = create_output_collection();
1590
1591   if (!$collated_output) {
1592     Log(undef, "Failed to write output collection");
1593   }
1594   else {
1595     Log(undef, "job output $collated_output");
1596     $Job->update_attributes('output' => $collated_output);
1597   }
1598   return $collated_output;
1599 }
1600
1601 sub killem
1602 {
1603   foreach (@_)
1604   {
1605     my $sig = 2;                # SIGINT first
1606     if (exists $proc{$_}->{"sent_$sig"} &&
1607         time - $proc{$_}->{"sent_$sig"} > 4)
1608     {
1609       $sig = 15;                # SIGTERM if SIGINT doesn't work
1610     }
1611     if (exists $proc{$_}->{"sent_$sig"} &&
1612         time - $proc{$_}->{"sent_$sig"} > 4)
1613     {
1614       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1615     }
1616     if (!exists $proc{$_}->{"sent_$sig"})
1617     {
1618       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1619       kill $sig, $_;
1620       select (undef, undef, undef, 0.1);
1621       if ($sig == 2)
1622       {
1623         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1624       }
1625       $proc{$_}->{"sent_$sig"} = time;
1626       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1627     }
1628   }
1629 }
1630
1631
1632 sub fhbits
1633 {
1634   my($bits);
1635   for (@_) {
1636     vec($bits,fileno($_),1) = 1;
1637   }
1638   $bits;
1639 }
1640
1641
1642 # Send log output to Keep via arv-put.
1643 #
1644 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1645 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1646 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1647 # $log_pipe_pid is the pid of the arv-put subprocess.
1648 #
1649 # The only functions that should access these variables directly are:
1650 #
1651 # log_writer_start($logfilename)
1652 #     Starts an arv-put pipe, reading data on stdin and writing it to
1653 #     a $logfilename file in an output collection.
1654 #
1655 # log_writer_read_output([$timeout])
1656 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1657 #     Passes $timeout to the select() call, with a default of 0.01.
1658 #     Returns the result of the last read() call on $log_pipe_out, or
1659 #     -1 if read() wasn't called because select() timed out.
1660 #     Only other log_writer_* functions should need to call this.
1661 #
1662 # log_writer_send($txt)
1663 #     Writes $txt to the output log collection.
1664 #
1665 # log_writer_finish()
1666 #     Closes the arv-put pipe and returns the output that it produces.
1667 #
1668 # log_writer_is_active()
1669 #     Returns a true value if there is currently a live arv-put
1670 #     process, false otherwise.
1671 #
1672 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1673     $log_pipe_pid);
1674
1675 sub log_writer_start($)
1676 {
1677   my $logfilename = shift;
1678   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1679                         'arv-put',
1680                         '--stream',
1681                         '--retries', '3',
1682                         '--filename', $logfilename,
1683                         '-');
1684   $log_pipe_out_buf = "";
1685   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1686 }
1687
1688 sub log_writer_read_output {
1689   my $timeout = shift || 0.01;
1690   my $read = -1;
1691   while ($read && $log_pipe_out_select->can_read($timeout)) {
1692     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1693                  length($log_pipe_out_buf));
1694   }
1695   if (!defined($read)) {
1696     Log(undef, "error reading log manifest from arv-put: $!");
1697   }
1698   return $read;
1699 }
1700
1701 sub log_writer_send($)
1702 {
1703   my $txt = shift;
1704   print $log_pipe_in $txt;
1705   log_writer_read_output();
1706 }
1707
1708 sub log_writer_finish()
1709 {
1710   return unless $log_pipe_pid;
1711
1712   close($log_pipe_in);
1713
1714   my $logger_failed = 0;
1715   my $read_result = log_writer_read_output(120);
1716   if ($read_result == -1) {
1717     $logger_failed = -1;
1718     Log (undef, "timed out reading from 'arv-put'");
1719   } elsif ($read_result != 0) {
1720     $logger_failed = -2;
1721     Log(undef, "failed to read arv-put log manifest to EOF");
1722   }
1723
1724   waitpid($log_pipe_pid, 0);
1725   if ($?) {
1726     $logger_failed ||= $?;
1727     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1728   }
1729
1730   close($log_pipe_out);
1731   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1732   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1733       $log_pipe_out_select = undef;
1734
1735   return $arv_put_output;
1736 }
1737
1738 sub log_writer_is_active() {
1739   return $log_pipe_pid;
1740 }
1741
1742 sub Log                         # ($jobstep_id, $logmessage)
1743 {
1744   if ($_[1] =~ /\n/) {
1745     for my $line (split (/\n/, $_[1])) {
1746       Log ($_[0], $line);
1747     }
1748     return;
1749   }
1750   my $fh = select STDERR; $|=1; select $fh;
1751   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1752   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1753   $message .= "\n";
1754   my $datetime;
1755   if (log_writer_is_active() || -t STDERR) {
1756     my @gmtime = gmtime;
1757     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1758                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1759   }
1760   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1761
1762   if (log_writer_is_active()) {
1763     log_writer_send($datetime . " " . $message);
1764   }
1765 }
1766
1767
1768 sub croak
1769 {
1770   my ($package, $file, $line) = caller;
1771   my $message = "@_ at $file line $line\n";
1772   Log (undef, $message);
1773   freeze() if @jobstep_todo;
1774   create_output_collection() if @jobstep_todo;
1775   cleanup();
1776   save_meta();
1777   die;
1778 }
1779
1780
1781 sub cleanup
1782 {
1783   return unless $Job;
1784   if ($Job->{'state'} eq 'Cancelled') {
1785     $Job->update_attributes('finished_at' => scalar gmtime);
1786   } else {
1787     $Job->update_attributes('state' => 'Failed');
1788   }
1789 }
1790
1791
1792 sub save_meta
1793 {
1794   my $justcheckpoint = shift; # false if this will be the last meta saved
1795   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1796   return unless log_writer_is_active();
1797   my $log_manifest = log_writer_finish();
1798   return unless defined($log_manifest);
1799
1800   if ($Job->{log}) {
1801     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1802     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1803   }
1804
1805   my $log_coll = api_call(
1806     "collections/create", ensure_unique_name => 1, collection => {
1807       manifest_text => $log_manifest,
1808       owner_uuid => $Job->{owner_uuid},
1809       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1810     });
1811   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1812   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1813 }
1814
1815
1816 sub freeze_if_want_freeze
1817 {
1818   if ($main::please_freeze)
1819   {
1820     release_allocation();
1821     if (@_)
1822     {
1823       # kill some srun procs before freeze+stop
1824       map { $proc{$_} = {} } @_;
1825       while (%proc)
1826       {
1827         killem (keys %proc);
1828         select (undef, undef, undef, 0.1);
1829         my $died;
1830         while (($died = waitpid (-1, WNOHANG)) > 0)
1831         {
1832           delete $proc{$died};
1833         }
1834       }
1835     }
1836     freeze();
1837     create_output_collection();
1838     cleanup();
1839     save_meta();
1840     exit 1;
1841   }
1842 }
1843
1844
1845 sub freeze
1846 {
1847   Log (undef, "Freeze not implemented");
1848   return;
1849 }
1850
1851
1852 sub thaw
1853 {
1854   croak ("Thaw not implemented");
1855 }
1856
1857
1858 sub freezequote
1859 {
1860   my $s = shift;
1861   $s =~ s/\\/\\\\/g;
1862   $s =~ s/\n/\\n/g;
1863   return $s;
1864 }
1865
1866
1867 sub freezeunquote
1868 {
1869   my $s = shift;
1870   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1871   return $s;
1872 }
1873
1874
1875 sub srun
1876 {
1877   my $srunargs = shift;
1878   my $execargs = shift;
1879   my $opts = shift || {};
1880   my $stdin = shift;
1881   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1882
1883   $Data::Dumper::Terse = 1;
1884   $Data::Dumper::Indent = 0;
1885   my $show_cmd = Dumper($args);
1886   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1887   $show_cmd =~ s/\n/ /g;
1888   if ($opts->{fork}) {
1889     Log(undef, "starting: $show_cmd");
1890   } else {
1891     # This is a child process: parent is in charge of reading our
1892     # stderr and copying it to Log() if needed.
1893     warn "starting: $show_cmd\n";
1894   }
1895
1896   if (defined $stdin) {
1897     my $child = open STDIN, "-|";
1898     defined $child or die "no fork: $!";
1899     if ($child == 0) {
1900       print $stdin or die $!;
1901       close STDOUT or die $!;
1902       exit 0;
1903     }
1904   }
1905
1906   return system (@$args) if $opts->{fork};
1907
1908   exec @$args;
1909   warn "ENV size is ".length(join(" ",%ENV));
1910   die "exec failed: $!: @$args";
1911 }
1912
1913
1914 sub ban_node_by_slot {
1915   # Don't start any new jobsteps on this node for 60 seconds
1916   my $slotid = shift;
1917   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1918   $slot[$slotid]->{node}->{hold_count}++;
1919   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1920 }
1921
1922 sub must_lock_now
1923 {
1924   my ($lockfile, $error_message) = @_;
1925   open L, ">", $lockfile or croak("$lockfile: $!");
1926   if (!flock L, LOCK_EX|LOCK_NB) {
1927     croak("Can't lock $lockfile: $error_message\n");
1928   }
1929 }
1930
1931 sub find_docker_image {
1932   # Given a Keep locator, check to see if it contains a Docker image.
1933   # If so, return its stream name and Docker hash.
1934   # If not, return undef for both values.
1935   my $locator = shift;
1936   my ($streamname, $filename);
1937   my $image = api_call("collections/get", uuid => $locator);
1938   if ($image) {
1939     foreach my $line (split(/\n/, $image->{manifest_text})) {
1940       my @tokens = split(/\s+/, $line);
1941       next if (!@tokens);
1942       $streamname = shift(@tokens);
1943       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1944         if (defined($filename)) {
1945           return (undef, undef);  # More than one file in the Collection.
1946         } else {
1947           $filename = (split(/:/, $filedata, 3))[2];
1948         }
1949       }
1950     }
1951   }
1952   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1953     return ($streamname, $1);
1954   } else {
1955     return (undef, undef);
1956   }
1957 }
1958
1959 sub retry_count {
1960   # Calculate the number of times an operation should be retried,
1961   # assuming exponential backoff, and that we're willing to retry as
1962   # long as tasks have been running.  Enforce a minimum of 3 retries.
1963   my ($starttime, $endtime, $timediff, $retries);
1964   if (@jobstep) {
1965     $starttime = $jobstep[0]->{starttime};
1966     $endtime = $jobstep[-1]->{finishtime};
1967   }
1968   if (!defined($starttime)) {
1969     $timediff = 0;
1970   } elsif (!defined($endtime)) {
1971     $timediff = time - $starttime;
1972   } else {
1973     $timediff = ($endtime - $starttime) - (time - $endtime);
1974   }
1975   if ($timediff > 0) {
1976     $retries = int(log($timediff) / log(2));
1977   } else {
1978     $retries = 1;  # Use the minimum.
1979   }
1980   return ($retries > 3) ? $retries : 3;
1981 }
1982
1983 sub retry_op {
1984   # Pass in two function references.
1985   # This method will be called with the remaining arguments.
1986   # If it dies, retry it with exponential backoff until it succeeds,
1987   # or until the current retry_count is exhausted.  After each failure
1988   # that can be retried, the second function will be called with
1989   # the current try count (0-based), next try time, and error message.
1990   my $operation = shift;
1991   my $retry_callback = shift;
1992   my $retries = retry_count();
1993   foreach my $try_count (0..$retries) {
1994     my $next_try = time + (2 ** $try_count);
1995     my $result = eval { $operation->(@_); };
1996     if (!$@) {
1997       return $result;
1998     } elsif ($try_count < $retries) {
1999       $retry_callback->($try_count, $next_try, $@);
2000       my $sleep_time = $next_try - time;
2001       sleep($sleep_time) if ($sleep_time > 0);
2002     }
2003   }
2004   # Ensure the error message ends in a newline, so Perl doesn't add
2005   # retry_op's line number to it.
2006   chomp($@);
2007   die($@ . "\n");
2008 }
2009
2010 sub api_call {
2011   # Pass in a /-separated API method name, and arguments for it.
2012   # This function will call that method, retrying as needed until
2013   # the current retry_count is exhausted, with a log on the first failure.
2014   my $method_name = shift;
2015   my $log_api_retry = sub {
2016     my ($try_count, $next_try_at, $errmsg) = @_;
2017     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2018     $errmsg =~ s/\s/ /g;
2019     $errmsg =~ s/\s+$//;
2020     my $retry_msg;
2021     if ($next_try_at < time) {
2022       $retry_msg = "Retrying.";
2023     } else {
2024       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2025       $retry_msg = "Retrying at $next_try_fmt.";
2026     }
2027     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2028   };
2029   my $method = $arv;
2030   foreach my $key (split(/\//, $method_name)) {
2031     $method = $method->{$key};
2032   }
2033   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2034 }
2035
2036 sub exit_status_s {
2037   # Given a $?, return a human-readable exit code string like "0" or
2038   # "1" or "0 with signal 1" or "1 with signal 11".
2039   my $exitcode = shift;
2040   my $s = $exitcode >> 8;
2041   if ($exitcode & 0x7f) {
2042     $s .= " with signal " . ($exitcode & 0x7f);
2043   }
2044   if ($exitcode & 0x80) {
2045     $s .= " with core dump";
2046   }
2047   return $s;
2048 }
2049
2050 sub handle_readall {
2051   # Pass in a glob reference to a file handle.
2052   # Read all its contents and return them as a string.
2053   my $fh_glob_ref = shift;
2054   local $/ = undef;
2055   return <$fh_glob_ref>;
2056 }
2057
2058 sub tar_filename_n {
2059   my $n = shift;
2060   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2061 }
2062
2063 sub add_git_archive {
2064   # Pass in a git archive command as a string or list, a la system().
2065   # This method will save its output to be included in the archive sent to the
2066   # build script.
2067   my $git_input;
2068   $git_tar_count++;
2069   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2070     croak("Failed to save git archive: $!");
2071   }
2072   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2073   close($git_input);
2074   waitpid($git_pid, 0);
2075   close(GIT_ARCHIVE);
2076   if ($?) {
2077     croak("Failed to save git archive: git exited " . exit_status_s($?));
2078   }
2079 }
2080
2081 sub combined_git_archive {
2082   # Combine all saved tar archives into a single archive, then return its
2083   # contents in a string.  Return undef if no archives have been saved.
2084   if ($git_tar_count < 1) {
2085     return undef;
2086   }
2087   my $base_tar_name = tar_filename_n(1);
2088   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2089     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2090     if ($tar_exit != 0) {
2091       croak("Error preparing build archive: tar -A exited " .
2092             exit_status_s($tar_exit));
2093     }
2094   }
2095   if (!open(GIT_TAR, "<", $base_tar_name)) {
2096     croak("Could not open build archive: $!");
2097   }
2098   my $tar_contents = handle_readall(\*GIT_TAR);
2099   close(GIT_TAR);
2100   return $tar_contents;
2101 }
2102
2103 sub set_nonblocking {
2104   my $fh = shift;
2105   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2106   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2107 }
2108
2109 __DATA__
2110 #!/usr/bin/env perl
2111 #
2112 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2113 # server invokes this script on individual compute nodes, or localhost if we're
2114 # running a job locally.  It gets called in two modes:
2115 #
2116 # * No arguments: Installation mode.  Read a tar archive from the DATA
2117 #   file handle; it includes the Crunch script's source code, and
2118 #   maybe SDKs as well.  Those should be installed in the proper
2119 #   locations.  This runs outside of any Docker container, so don't try to
2120 #   introspect Crunch's runtime environment.
2121 #
2122 # * With arguments: Crunch script run mode.  This script should set up the
2123 #   environment, then run the command specified in the arguments.  This runs
2124 #   inside any Docker container.
2125
2126 use Fcntl ':flock';
2127 use File::Path qw( make_path remove_tree );
2128 use POSIX qw(getcwd);
2129
2130 use constant TASK_TEMPFAIL => 111;
2131
2132 # Map SDK subdirectories to the path environments they belong to.
2133 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2134
2135 my $destdir = $ENV{"CRUNCH_SRC"};
2136 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2137 my $repo = $ENV{"CRUNCH_SRC_URL"};
2138 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2139 my $job_work = $ENV{"JOB_WORK"};
2140 my $task_work = $ENV{"TASK_WORK"};
2141
2142 open(STDOUT_ORIG, ">&", STDOUT);
2143 open(STDERR_ORIG, ">&", STDERR);
2144
2145 for my $dir ($destdir, $job_work, $task_work) {
2146   if ($dir) {
2147     make_path $dir;
2148     -e $dir or die "Failed to create temporary directory ($dir): $!";
2149   }
2150 }
2151
2152 if ($task_work) {
2153   remove_tree($task_work, {keep_root => 1});
2154 }
2155
2156 ### Crunch script run mode
2157 if (@ARGV) {
2158   # We want to do routine logging during task 0 only.  This gives the user
2159   # the information they need, but avoids repeating the information for every
2160   # task.
2161   my $Log;
2162   if ($ENV{TASK_SEQUENCE} eq "0") {
2163     $Log = sub {
2164       my $msg = shift;
2165       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2166     };
2167   } else {
2168     $Log = sub { };
2169   }
2170
2171   my $python_src = "$install_dir/python";
2172   my $venv_dir = "$job_work/.arvados.venv";
2173   my $venv_built = -e "$venv_dir/bin/activate";
2174   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2175     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2176                  "--python=python2.7", $venv_dir);
2177     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2178     $venv_built = 1;
2179     $Log->("Built Python SDK virtualenv");
2180   }
2181
2182   my @pysdk_version_cmd = ("python", "-c",
2183     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2184   if ($venv_built) {
2185     $Log->("Running in Python SDK virtualenv");
2186     @pysdk_version_cmd = ();
2187     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2188     @ARGV = ("/bin/sh", "-ec",
2189              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2190   } elsif (-d $python_src) {
2191     $Log->("Warning: virtualenv not found inside Docker container default " .
2192            "\$PATH. Can't install Python SDK.");
2193   }
2194
2195   if (@pysdk_version_cmd) {
2196     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2197     my $pysdk_version = <$pysdk_version_pipe>;
2198     close($pysdk_version_pipe);
2199     if ($? == 0) {
2200       chomp($pysdk_version);
2201       $Log->("Using Arvados SDK version $pysdk_version");
2202     } else {
2203       # A lot could've gone wrong here, but pretty much all of it means that
2204       # Python won't be able to load the Arvados SDK.
2205       $Log->("Warning: Arvados SDK not found");
2206     }
2207   }
2208
2209   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2210     my $sdk_path = "$install_dir/$sdk_dir";
2211     if (-d $sdk_path) {
2212       if ($ENV{$sdk_envkey}) {
2213         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2214       } else {
2215         $ENV{$sdk_envkey} = $sdk_path;
2216       }
2217       $Log->("Arvados SDK added to %s", $sdk_envkey);
2218     }
2219   }
2220
2221   exec(@ARGV);
2222   die "Cannot exec `@ARGV`: $!";
2223 }
2224
2225 ### Installation mode
2226 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2227 flock L, LOCK_EX;
2228 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2229   # This exact git archive (source + arvados sdk) is already installed
2230   # here, so there's no need to reinstall it.
2231
2232   # We must consume our DATA section, though: otherwise the process
2233   # feeding it to us will get SIGPIPE.
2234   my $buf;
2235   while (read(DATA, $buf, 65536)) { }
2236
2237   exit(0);
2238 }
2239
2240 unlink "$destdir.archive_hash";
2241 mkdir $destdir;
2242
2243 do {
2244   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2245   local $SIG{PIPE} = "IGNORE";
2246   warn "Extracting archive: $archive_hash\n";
2247   # --ignore-zeros is necessary sometimes: depending on how much NUL
2248   # padding tar -A put on our combined archive (which in turn depends
2249   # on the length of the component archives) tar without
2250   # --ignore-zeros will exit before consuming stdin and cause close()
2251   # to fail on the resulting SIGPIPE.
2252   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2253     die "Error launching 'tar -xC $destdir': $!";
2254   }
2255   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2256   # get SIGPIPE.  We must feed it data incrementally.
2257   my $tar_input;
2258   while (read(DATA, $tar_input, 65536)) {
2259     print TARX $tar_input;
2260   }
2261   if(!close(TARX)) {
2262     die "'tar -xC $destdir' exited $?: $!";
2263   }
2264 };
2265
2266 mkdir $install_dir;
2267
2268 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2269 if (-d $sdk_root) {
2270   foreach my $sdk_lang (("python",
2271                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2272     if (-d "$sdk_root/$sdk_lang") {
2273       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2274         die "Failed to install $sdk_lang SDK: $!";
2275       }
2276     }
2277   }
2278 }
2279
2280 my $python_dir = "$install_dir/python";
2281 if ((-d $python_dir) and can_run("python2.7")) {
2282   open(my $egg_info_pipe, "-|",
2283        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2284   my @egg_info_errors = <$egg_info_pipe>;
2285   close($egg_info_pipe);
2286
2287   if ($?) {
2288     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2289       # egg_info apparently failed because it couldn't ask git for a build tag.
2290       # Specify no build tag.
2291       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2292       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2293       close($pysdk_cfg);
2294     } else {
2295       my $egg_info_exit = $? >> 8;
2296       foreach my $errline (@egg_info_errors) {
2297         warn $errline;
2298       }
2299       warn "python setup.py egg_info failed: exit $egg_info_exit";
2300       exit ($egg_info_exit || 1);
2301     }
2302   }
2303 }
2304
2305 # Hide messages from the install script (unless it fails: shell_or_die
2306 # will show $destdir.log in that case).
2307 open(STDOUT, ">>", "$destdir.log");
2308 open(STDERR, ">&", STDOUT);
2309
2310 if (-e "$destdir/crunch_scripts/install") {
2311     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2312 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2313     # Old version
2314     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2315 } elsif (-e "./install.sh") {
2316     shell_or_die (undef, "./install.sh", $install_dir);
2317 }
2318
2319 if ($archive_hash) {
2320     unlink "$destdir.archive_hash.new";
2321     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2322     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2323 }
2324
2325 close L;
2326
2327 sub can_run {
2328   my $command_name = shift;
2329   open(my $which, "-|", "which", $command_name);
2330   while (<$which>) { }
2331   close($which);
2332   return ($? == 0);
2333 }
2334
2335 sub shell_or_die
2336 {
2337   my $exitcode = shift;
2338
2339   if ($ENV{"DEBUG"}) {
2340     print STDERR "@_\n";
2341   }
2342   if (system (@_) != 0) {
2343     my $err = $!;
2344     my $code = $?;
2345     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2346     open STDERR, ">&STDERR_ORIG";
2347     system ("cat $destdir.log >&2");
2348     warn "@_ failed ($err): $exitstatus";
2349     if (defined($exitcode)) {
2350       exit $exitcode;
2351     }
2352     else {
2353       exit (($code >> 8) || 1);
2354     }
2355   }
2356 }
2357
2358 __DATA__