70d05f023c93147a2dcad2b4b61335102acfc832
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
187      $cmd,
188      {fork => 1});
189 if ($? != 0) {
190   Log(undef, "Sanity check failed: ".exit_status_s($?));
191   exit EX_TEMPFAIL;
192 }
193 Log(undef, "Sanity check OK");
194
195
196 my $User = api_call("users/current");
197
198 if (!$local_job) {
199   if (!$force_unlock) {
200     # Claim this job, and make sure nobody else does
201     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
202     if ($@) {
203       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
204       exit EX_TEMPFAIL;
205     };
206   }
207 }
208 else
209 {
210   if (!$resume_stash)
211   {
212     map { croak ("No $_ specified") unless $local_job->{$_} }
213     qw(script script_version script_parameters);
214   }
215
216   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
217   $local_job->{'started_at'} = gmtime;
218   $local_job->{'state'} = 'Running';
219
220   $Job = api_call("jobs/create", job => $local_job);
221 }
222 $job_id = $Job->{'uuid'};
223
224 my $keep_logfile = $job_id . '.log.txt';
225 log_writer_start($keep_logfile);
226
227 $Job->{'runtime_constraints'} ||= {};
228 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
229 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
230
231 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
232 if ($? == 0) {
233   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
234   chomp($gem_versions);
235   chop($gem_versions);  # Closing parentheses
236 } else {
237   $gem_versions = "";
238 }
239 Log(undef,
240     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
241
242 Log (undef, "check slurm allocation");
243 my @slot;
244 my @node;
245 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
246 my @sinfo;
247 if (!$have_slurm)
248 {
249   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
250   push @sinfo, "$localcpus localhost";
251 }
252 if (exists $ENV{SLURM_NODELIST})
253 {
254   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
255 }
256 foreach (@sinfo)
257 {
258   my ($ncpus, $slurm_nodelist) = split;
259   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260
261   my @nodelist;
262   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263   {
264     my $nodelist = $1;
265     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266     {
267       my $ranges = $1;
268       foreach (split (",", $ranges))
269       {
270         my ($a, $b);
271         if (/(\d+)-(\d+)/)
272         {
273           $a = $1;
274           $b = $2;
275         }
276         else
277         {
278           $a = $_;
279           $b = $_;
280         }
281         push @nodelist, map {
282           my $n = $nodelist;
283           $n =~ s/\[[-,\d]+\]/$_/;
284           $n;
285         } ($a..$b);
286       }
287     }
288     else
289     {
290       push @nodelist, $nodelist;
291     }
292   }
293   foreach my $nodename (@nodelist)
294   {
295     Log (undef, "node $nodename - $ncpus slots");
296     my $node = { name => $nodename,
297                  ncpus => $ncpus,
298                  # The number of consecutive times a task has been dispatched
299                  # to this node and failed.
300                  losing_streak => 0,
301                  # The number of consecutive times that SLURM has reported
302                  # a node failure since the last successful task.
303                  fail_count => 0,
304                  # Don't dispatch work to this node until this time
305                  # (in seconds since the epoch) has passed.
306                  hold_until => 0 };
307     foreach my $cpu (1..$ncpus)
308     {
309       push @slot, { node => $node,
310                     cpu => $cpu };
311     }
312   }
313   push @node, @nodelist;
314 }
315
316
317
318 # Ensure that we get one jobstep running on each allocated node before
319 # we start overloading nodes with concurrent steps
320
321 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322
323
324 $Job->update_attributes(
325   'tasks_summary' => { 'failed' => 0,
326                        'todo' => 1,
327                        'running' => 0,
328                        'done' => 0 });
329
330 Log (undef, "start");
331 $SIG{'INT'} = sub { $main::please_freeze = 1; };
332 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
333 $SIG{'TERM'} = \&croak;
334 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
335 $SIG{'ALRM'} = sub { $main::please_info = 1; };
336 $SIG{'CONT'} = sub { $main::please_continue = 1; };
337 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
338
339 $main::please_freeze = 0;
340 $main::please_info = 0;
341 $main::please_continue = 0;
342 $main::please_refresh = 0;
343 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
344
345 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
346 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
347 $ENV{"JOB_UUID"} = $job_id;
348
349
350 my @jobstep_todo = ();
351 my @jobstep_done = ();
352 my @jobstep_tomerge = ();
353 my $jobstep_tomerge_level = 0;
354 my $squeue_checked = 0;
355 my $latest_refresh = scalar time;
356
357
358
359 if (defined $Job->{thawedfromkey})
360 {
361   thaw ($Job->{thawedfromkey});
362 }
363 else
364 {
365   my $first_task = api_call("job_tasks/create", job_task => {
366     'job_uuid' => $Job->{'uuid'},
367     'sequence' => 0,
368     'qsequence' => 0,
369     'parameters' => {},
370   });
371   push @jobstep, { 'level' => 0,
372                    'failures' => 0,
373                    'arvados_task' => $first_task,
374                  };
375   push @jobstep_todo, 0;
376 }
377
378
379 if (!$have_slurm)
380 {
381   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 }
383
384 my $build_script = handle_readall(\*DATA);
385 my $nodelist = join(",", @node);
386 my $git_tar_count = 0;
387
388 if (!defined $no_clear_tmp) {
389   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390   Log (undef, "Clean work dirs");
391
392   my $cleanpid = fork();
393   if ($cleanpid == 0)
394   {
395     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
396     # Then clean up work directories.
397     # TODO: When #5036 is done and widely deployed, we can limit mount's
398     # -t option to simply fuse.keep.
399     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
401     exit (1);
402   }
403   while (1)
404   {
405     last if $cleanpid == waitpid (-1, WNOHANG);
406     freeze_if_want_freeze ($cleanpid);
407     select (undef, undef, undef, 0.1);
408   }
409   if ($?) {
410     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
411     exit(EX_RETRY_UNLOCKED);
412   }
413 }
414
415 # If this job requires a Docker image, install that.
416 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
417 if ($docker_locator = $Job->{docker_image_locator}) {
418   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419   if (!$docker_hash)
420   {
421     croak("No Docker image hash found from locator $docker_locator");
422   }
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
426     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
427 fi
428 };
429   my $docker_pid = fork();
430   if ($docker_pid == 0)
431   {
432     srun (["srun", "--nodelist=" . join(',', @node)],
433           ["/bin/sh", "-ec", $docker_install_script]);
434     exit ($?);
435   }
436   while (1)
437   {
438     last if $docker_pid == waitpid (-1, WNOHANG);
439     freeze_if_want_freeze ($docker_pid);
440     select (undef, undef, undef, 0.1);
441   }
442   if ($? != 0)
443   {
444     croak("Installing Docker image from $docker_locator exited "
445           .exit_status_s($?));
446   }
447
448   # Determine whether this version of Docker supports memory+swap limits.
449   srun(["srun", "--nodelist=" . $node[0]],
450        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
451       {fork => 1});
452   $docker_limitmem = ($? == 0);
453
454   # Find a non-root Docker user to use.
455   # Tries the default user for the container, then 'crunch', then 'nobody',
456   # testing for whether the actual user id is non-zero.  This defends against
457   # mistakes but not malice, but we intend to harden the security in the future
458   # so we don't want anyone getting used to their jobs running as root in their
459   # Docker containers.
460   my @tryusers = ("", "crunch", "nobody");
461   foreach my $try_user (@tryusers) {
462     my $try_user_arg;
463     if ($try_user eq "") {
464       Log(undef, "Checking if container default user is not UID 0");
465       $try_user_arg = "";
466     } else {
467       Log(undef, "Checking if user '$try_user' is not UID 0");
468       $try_user_arg = "--user=$try_user";
469     }
470     srun(["srun", "--nodelist=" . $node[0]],
471          ["/bin/sh", "-ec",
472           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
473           " test \$a -ne 0"],
474          {fork => 1});
475     if ($? == 0) {
476       $dockeruserarg = $try_user_arg;
477       if ($try_user eq "") {
478         Log(undef, "Container will run with default user");
479       } else {
480         Log(undef, "Container will run with $dockeruserarg");
481       }
482       last;
483     }
484   }
485
486   if (!defined $dockeruserarg) {
487     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
488   }
489
490   if ($Job->{arvados_sdk_version}) {
491     # The job also specifies an Arvados SDK version.  Add the SDKs to the
492     # tar file for the build script to install.
493     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
494                        $Job->{arvados_sdk_version}));
495     add_git_archive("git", "--git-dir=$git_dir", "archive",
496                     "--prefix=.arvados.sdk/",
497                     $Job->{arvados_sdk_version}, "sdk");
498   }
499 }
500
501 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
502   # If script_version looks like an absolute path, *and* the --git-dir
503   # argument was not given -- which implies we were not invoked by
504   # crunch-dispatch -- we will use the given path as a working
505   # directory instead of resolving script_version to a git commit (or
506   # doing anything else with git).
507   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
508   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
509 }
510 else {
511   # Resolve the given script_version to a git commit sha1. Also, if
512   # the repository is remote, clone it into our local filesystem: this
513   # ensures "git archive" will work, and is necessary to reliably
514   # resolve a symbolic script_version like "master^".
515   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
516
517   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
520
521   # If we're running under crunch-dispatch, it will have already
522   # pulled the appropriate source tree into its own repository, and
523   # given us that repo's path as $git_dir.
524   #
525   # If we're running a "local" job, we might have to fetch content
526   # from a remote repository.
527   #
528   # (Currently crunch-dispatch gives a local path with --git-dir, but
529   # we might as well accept URLs there too in case it changes its
530   # mind.)
531   my $repo = $git_dir || $Job->{'repository'};
532
533   # Repository can be remote or local. If remote, we'll need to fetch it
534   # to a local dir before doing `git log` et al.
535   my $repo_location;
536
537   if ($repo =~ m{://|^[^/]*:}) {
538     # $repo is a git url we can clone, like git:// or https:// or
539     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
540     # not recognized here because distinguishing that from a local
541     # path is too fragile. If you really need something strange here,
542     # use the ssh:// form.
543     $repo_location = 'remote';
544   } elsif ($repo =~ m{^\.*/}) {
545     # $repo is a local path to a git index. We'll also resolve ../foo
546     # to ../foo/.git if the latter is a directory. To help
547     # disambiguate local paths from named hosted repositories, this
548     # form must be given as ./ or ../ if it's a relative path.
549     if (-d "$repo/.git") {
550       $repo = "$repo/.git";
551     }
552     $repo_location = 'local';
553   } else {
554     # $repo is none of the above. It must be the name of a hosted
555     # repository.
556     my $arv_repo_list = api_call("repositories/list",
557                                  'filters' => [['name','=',$repo]]);
558     my @repos_found = @{$arv_repo_list->{'items'}};
559     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
560     if ($n_found > 0) {
561       Log(undef, "Repository '$repo' -> "
562           . join(", ", map { $_->{'uuid'} } @repos_found));
563     }
564     if ($n_found != 1) {
565       croak("Error: Found $n_found repositories with name '$repo'.");
566     }
567     $repo = $repos_found[0]->{'fetch_url'};
568     $repo_location = 'remote';
569   }
570   Log(undef, "Using $repo_location repository '$repo'");
571   $ENV{"CRUNCH_SRC_URL"} = $repo;
572
573   # Resolve given script_version (we'll call that $treeish here) to a
574   # commit sha1 ($commit).
575   my $treeish = $Job->{'script_version'};
576   my $commit;
577   if ($repo_location eq 'remote') {
578     # We minimize excess object-fetching by re-using the same bare
579     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
580     # just keep adding remotes to it as needed.
581     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
582     my $gitcmd = "git --git-dir=\Q$local_repo\E";
583
584     # Set up our local repo for caching remote objects, making
585     # archives, etc.
586     if (!-d $local_repo) {
587       make_path($local_repo) or croak("Error: could not create $local_repo");
588     }
589     # This works (exits 0 and doesn't delete fetched objects) even
590     # if $local_repo is already initialized:
591     `$gitcmd init --bare`;
592     if ($?) {
593       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
594     }
595
596     # If $treeish looks like a hash (or abbrev hash) we look it up in
597     # our local cache first, since that's cheaper. (We don't want to
598     # do that with tags/branches though -- those change over time, so
599     # they should always be resolved by the remote repo.)
600     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
601       # Hide stderr because it's normal for this to fail:
602       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
603       if ($? == 0 &&
604           # Careful not to resolve a branch named abcdeff to commit 1234567:
605           $sha1 =~ /^$treeish/ &&
606           $sha1 =~ /^([0-9a-f]{40})$/s) {
607         $commit = $1;
608         Log(undef, "Commit $commit already present in $local_repo");
609       }
610     }
611
612     if (!defined $commit) {
613       # If $treeish isn't just a hash or abbrev hash, or isn't here
614       # yet, we need to fetch the remote to resolve it correctly.
615
616       # First, remove all local heads. This prevents a name that does
617       # not exist on the remote from resolving to (or colliding with)
618       # a previously fetched branch or tag (possibly from a different
619       # remote).
620       remove_tree("$local_repo/refs/heads", {keep_root => 1});
621
622       Log(undef, "Fetching objects from $repo to $local_repo");
623       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
624       if ($?) {
625         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626       }
627     }
628
629     # Now that the data is all here, we will use our local repo for
630     # the rest of our git activities.
631     $repo = $local_repo;
632   }
633
634   my $gitcmd = "git --git-dir=\Q$repo\E";
635   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
636   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
637     croak("`$gitcmd rev-list` exited "
638           .exit_status_s($?)
639           .", '$treeish' not found, giving up");
640   }
641   $commit = $1;
642   Log(undef, "Version $treeish is commit $commit");
643
644   if ($commit ne $Job->{'script_version'}) {
645     # Record the real commit id in the database, frozentokey, logs,
646     # etc. -- instead of an abbreviation or a branch name which can
647     # become ambiguous or point to a different commit in the future.
648     if (!$Job->update_attributes('script_version' => $commit)) {
649       croak("Error: failed to update job's script_version attribute");
650     }
651   }
652
653   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
654   add_git_archive("$gitcmd archive ''\Q$commit\E");
655 }
656
657 my $git_archive = combined_git_archive();
658 if (!defined $git_archive) {
659   Log(undef, "Skip install phase (no git archive)");
660   if ($have_slurm) {
661     Log(undef, "Warning: This probably means workers have no source tree!");
662   }
663 }
664 else {
665   my $install_exited;
666   my $install_script_tries_left = 3;
667   for (my $attempts = 0; $attempts < 3; $attempts++) {
668     Log(undef, "Run install script on all workers");
669
670     my @srunargs = ("srun",
671                     "--nodelist=$nodelist",
672                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
673     my @execargs = ("sh", "-c",
674                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
675
676     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
677     my ($install_stderr_r, $install_stderr_w);
678     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
679     set_nonblocking($install_stderr_r);
680     my $installpid = fork();
681     if ($installpid == 0)
682     {
683       close($install_stderr_r);
684       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
685       open(STDOUT, ">&", $install_stderr_w);
686       open(STDERR, ">&", $install_stderr_w);
687       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
688       exit (1);
689     }
690     close($install_stderr_w);
691     # Tell freeze_if_want_freeze how to kill the child, otherwise the
692     # "waitpid(installpid)" loop won't get interrupted by a freeze:
693     $proc{$installpid} = {};
694     my $stderr_buf = '';
695     # Track whether anything appears on stderr other than slurm errors
696     # ("srun: ...") and the "starting: ..." message printed by the
697     # srun subroutine itself:
698     my $stderr_anything_from_script = 0;
699     my $match_our_own_errors = '^(srun: error: |starting: \[)';
700     while ($installpid != waitpid(-1, WNOHANG)) {
701       freeze_if_want_freeze ($installpid);
702       # Wait up to 0.1 seconds for something to appear on stderr, then
703       # do a non-blocking read.
704       my $bits = fhbits($install_stderr_r);
705       select ($bits, undef, $bits, 0.1);
706       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
707       {
708         while ($stderr_buf =~ /^(.*?)\n/) {
709           my $line = $1;
710           substr $stderr_buf, 0, 1+length($line), "";
711           Log(undef, "stderr $line");
712           if ($line !~ /$match_our_own_errors/) {
713             $stderr_anything_from_script = 1;
714           }
715         }
716       }
717     }
718     delete $proc{$installpid};
719     $install_exited = $?;
720     close($install_stderr_r);
721     if (length($stderr_buf) > 0) {
722       if ($stderr_buf !~ /$match_our_own_errors/) {
723         $stderr_anything_from_script = 1;
724       }
725       Log(undef, "stderr $stderr_buf")
726     }
727
728     Log (undef, "Install script exited ".exit_status_s($install_exited));
729     last if $install_exited == 0 || $main::please_freeze;
730     # If the install script fails but doesn't print an error message,
731     # the next thing anyone is likely to do is just run it again in
732     # case it was a transient problem like "slurm communication fails
733     # because the network isn't reliable enough". So we'll just do
734     # that ourselves (up to 3 attempts in total). OTOH, if there is an
735     # error message, the problem is more likely to have a real fix and
736     # we should fail the job so the fixing process can start, instead
737     # of doing 2 more attempts.
738     last if $stderr_anything_from_script;
739   }
740
741   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
742     unlink($tar_filename);
743   }
744
745   if ($install_exited != 0) {
746     croak("Giving up");
747   }
748 }
749
750 foreach (qw (script script_version script_parameters runtime_constraints))
751 {
752   Log (undef,
753        "$_ " .
754        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
755 }
756 foreach (split (/\n/, $Job->{knobs}))
757 {
758   Log (undef, "knob " . $_);
759 }
760
761
762
763 $main::success = undef;
764
765
766
767 ONELEVEL:
768
769 my $thisround_succeeded = 0;
770 my $thisround_failed = 0;
771 my $thisround_failed_multiple = 0;
772 my $working_slot_count = scalar(@slot);
773
774 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
775                        or $a <=> $b } @jobstep_todo;
776 my $level = $jobstep[$jobstep_todo[0]]->{level};
777
778 my $initial_tasks_this_level = 0;
779 foreach my $id (@jobstep_todo) {
780   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
781 }
782
783 # If the number of tasks scheduled at this level #T is smaller than the number
784 # of slots available #S, only use the first #T slots, or the first slot on
785 # each node, whichever number is greater.
786 #
787 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
788 # based on these numbers.  Using fewer slots makes more resources available
789 # to each individual task, which should normally be a better strategy when
790 # there are fewer of them running with less parallelism.
791 #
792 # Note that this calculation is not redone if the initial tasks at
793 # this level queue more tasks at the same level.  This may harm
794 # overall task throughput for that level.
795 my @freeslot;
796 if ($initial_tasks_this_level < @node) {
797   @freeslot = (0..$#node);
798 } elsif ($initial_tasks_this_level < @slot) {
799   @freeslot = (0..$initial_tasks_this_level - 1);
800 } else {
801   @freeslot = (0..$#slot);
802 }
803 my $round_num_freeslots = scalar(@freeslot);
804
805 my %round_max_slots = ();
806 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
807   my $this_slot = $slot[$freeslot[$ii]];
808   my $node_name = $this_slot->{node}->{name};
809   $round_max_slots{$node_name} ||= $this_slot->{cpu};
810   last if (scalar(keys(%round_max_slots)) >= @node);
811 }
812
813 Log(undef, "start level $level with $round_num_freeslots slots");
814 my @holdslot;
815 my %reader;
816 my $progress_is_dirty = 1;
817 my $progress_stats_updated = 0;
818
819 update_progress_stats();
820
821
822 THISROUND:
823 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
824 {
825   # Don't create new tasks if we already know the job's final result.
826   last if defined($main::success);
827
828   my $id = $jobstep_todo[$todo_ptr];
829   my $Jobstep = $jobstep[$id];
830   if ($Jobstep->{level} != $level)
831   {
832     next;
833   }
834
835   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
836   set_nonblocking($reader{$id});
837
838   my $childslot = $freeslot[0];
839   my $childnode = $slot[$childslot]->{node};
840   my $childslotname = join (".",
841                             $slot[$childslot]->{node}->{name},
842                             $slot[$childslot]->{cpu});
843
844   my $childpid = fork();
845   if ($childpid == 0)
846   {
847     $SIG{'INT'} = 'DEFAULT';
848     $SIG{'QUIT'} = 'DEFAULT';
849     $SIG{'TERM'} = 'DEFAULT';
850
851     foreach (values (%reader))
852     {
853       close($_);
854     }
855     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
856     open(STDOUT,">&writer");
857     open(STDERR,">&writer");
858
859     undef $dbh;
860     undef $sth;
861
862     delete $ENV{"GNUPGHOME"};
863     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
864     $ENV{"TASK_QSEQUENCE"} = $id;
865     $ENV{"TASK_SEQUENCE"} = $level;
866     $ENV{"JOB_SCRIPT"} = $Job->{script};
867     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
868       $param =~ tr/a-z/A-Z/;
869       $ENV{"JOB_PARAMETER_$param"} = $value;
870     }
871     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
872     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
873     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
874     $ENV{"HOME"} = $ENV{"TASK_WORK"};
875     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
876     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
877     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
878
879     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
880
881     $ENV{"GZIP"} = "-n";
882
883     my @srunargs = (
884       "srun",
885       "--nodelist=".$childnode->{name},
886       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
887       "--job-name=$job_id.$id.$$",
888         );
889
890     my $stdbuf = " stdbuf --output=0 --error=0 ";
891
892     my $arv_file_cache = "";
893     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
894       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
895     }
896
897     my $command =
898         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
899         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
900         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
901         # These environment variables get used explicitly later in
902         # $command.  No tool is expected to read these values directly.
903         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
904         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
905         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
906         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
907
908     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
909     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
910     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
911
912     if ($docker_hash)
913     {
914       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
915       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
916       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
917       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
918       # We only set memory limits if Docker lets us limit both memory and swap.
919       # Memory limits alone have been supported longer, but subprocesses tend
920       # to get SIGKILL if they exceed that without any swap limit set.
921       # See #5642 for additional background.
922       if ($docker_limitmem) {
923         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
924       }
925
926       # The source tree and $destdir directory (which we have
927       # installed on the worker host) are available in the container,
928       # under the same path.
929       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
930       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
931
932       # Currently, we make the "by_pdh" directory in arv-mount's mount
933       # point appear at /keep inside the container (instead of using
934       # the same path as the host like we do with CRUNCH_SRC and
935       # CRUNCH_INSTALL). However, crunch scripts and utilities must
936       # not rely on this. They must use $TASK_KEEPMOUNT.
937       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
938       $ENV{TASK_KEEPMOUNT} = "/keep";
939
940       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
941       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
942       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
943
944       # TASK_WORK is almost exactly like a docker data volume: it
945       # starts out empty, is writable, and persists until no
946       # containers use it any more. We don't use --volumes-from to
947       # share it with other containers: it is only accessible to this
948       # task, and it goes away when this task stops.
949       #
950       # However, a docker data volume is writable only by root unless
951       # the mount point already happens to exist in the container with
952       # different permissions. Therefore, we [1] assume /tmp already
953       # exists in the image and is writable by the crunch user; [2]
954       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
955       # writable if they are created by docker while setting up the
956       # other --volumes); and [3] create $TASK_WORK inside the
957       # container using $build_script.
958       $command .= "--volume=/tmp ";
959       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
960       $ENV{"HOME"} = $ENV{"TASK_WORK"};
961       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
962
963       # TODO: Share a single JOB_WORK volume across all task
964       # containers on a given worker node, and delete it when the job
965       # ends (and, in case that doesn't work, when the next job
966       # starts).
967       #
968       # For now, use the same approach as TASK_WORK above.
969       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
970
971       while (my ($env_key, $env_val) = each %ENV)
972       {
973         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
974           $command .= "--env=\Q$env_key=$env_val\E ";
975         }
976       }
977       $command .= "--env=\QHOME=$ENV{HOME}\E ";
978       $command .= "\Q$docker_hash\E ";
979
980       if ($Job->{arvados_sdk_version}) {
981         $command .= $stdbuf;
982         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
983       } else {
984         $command .= "/bin/sh -c \'python -c " .
985             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
986             ">&2 2>/dev/null; " .
987             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
988             "if which stdbuf >/dev/null ; then " .
989             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
990             " else " .
991             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
992             " fi\'";
993       }
994     } else {
995       # Non-docker run
996       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
997       $command .= $stdbuf;
998       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
999     }
1000
1001     my @execargs = ('bash', '-c', $command);
1002     srun (\@srunargs, \@execargs, undef, $build_script);
1003     # exec() failed, we assume nothing happened.
1004     die "srun() failed on build script\n";
1005   }
1006   close("writer");
1007   if (!defined $childpid)
1008   {
1009     close $reader{$id};
1010     delete $reader{$id};
1011     next;
1012   }
1013   shift @freeslot;
1014   $proc{$childpid} = { jobstep => $id,
1015                        time => time,
1016                        slot => $childslot,
1017                        jobstepname => "$job_id.$id.$childpid",
1018                      };
1019   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1020   $slot[$childslot]->{pid} = $childpid;
1021
1022   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1023   Log ($id, "child $childpid started on $childslotname");
1024   $Jobstep->{starttime} = time;
1025   $Jobstep->{node} = $childnode->{name};
1026   $Jobstep->{slotindex} = $childslot;
1027   delete $Jobstep->{stderr};
1028   delete $Jobstep->{finishtime};
1029   delete $Jobstep->{tempfail};
1030
1031   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1032   $Jobstep->{'arvados_task'}->save;
1033
1034   splice @jobstep_todo, $todo_ptr, 1;
1035   --$todo_ptr;
1036
1037   $progress_is_dirty = 1;
1038
1039   while (!@freeslot
1040          ||
1041          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1042   {
1043     last THISROUND if $main::please_freeze;
1044     if ($main::please_info)
1045     {
1046       $main::please_info = 0;
1047       freeze();
1048       create_output_collection();
1049       save_meta(1);
1050       update_progress_stats();
1051     }
1052     my $gotsome
1053         = readfrompipes ()
1054         + reapchildren ();
1055     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1056     {
1057       check_refresh_wanted();
1058       check_squeue();
1059       update_progress_stats();
1060       select (undef, undef, undef, 0.1);
1061     }
1062     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1063     {
1064       update_progress_stats();
1065     }
1066     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1067                                         $_->{node}->{hold_count} < 4 } @slot);
1068     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1069         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1070     {
1071       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1072           .($thisround_failed+$thisround_succeeded)
1073           .") -- giving up on this round";
1074       Log (undef, $message);
1075       last THISROUND;
1076     }
1077
1078     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1079     for (my $i=$#freeslot; $i>=0; $i--) {
1080       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1081         push @holdslot, (splice @freeslot, $i, 1);
1082       }
1083     }
1084     for (my $i=$#holdslot; $i>=0; $i--) {
1085       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1086         push @freeslot, (splice @holdslot, $i, 1);
1087       }
1088     }
1089
1090     # give up if no nodes are succeeding
1091     if ($working_slot_count < 1) {
1092       Log(undef, "Every node has failed -- giving up");
1093       last THISROUND;
1094     }
1095   }
1096 }
1097
1098
1099 push @freeslot, splice @holdslot;
1100 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1101
1102
1103 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1104 while (%proc)
1105 {
1106   if ($main::please_continue) {
1107     $main::please_continue = 0;
1108     goto THISROUND;
1109   }
1110   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1111   readfrompipes ();
1112   if (!reapchildren())
1113   {
1114     check_refresh_wanted();
1115     check_squeue();
1116     update_progress_stats();
1117     select (undef, undef, undef, 0.1);
1118     killem (keys %proc) if $main::please_freeze;
1119   }
1120 }
1121
1122 update_progress_stats();
1123 freeze_if_want_freeze();
1124
1125
1126 if (!defined $main::success)
1127 {
1128   if (!@jobstep_todo) {
1129     $main::success = 1;
1130   } elsif ($working_slot_count < 1) {
1131     save_output_collection();
1132     save_meta();
1133     exit(EX_RETRY_UNLOCKED);
1134   } elsif ($thisround_succeeded == 0 &&
1135            ($thisround_failed == 0 || $thisround_failed > 4)) {
1136     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1137     Log (undef, $message);
1138     $main::success = 0;
1139   }
1140 }
1141
1142 goto ONELEVEL if !defined $main::success;
1143
1144
1145 release_allocation();
1146 freeze();
1147 my $collated_output = save_output_collection();
1148 Log (undef, "finish");
1149
1150 save_meta();
1151
1152 my $final_state;
1153 if ($collated_output && $main::success) {
1154   $final_state = 'Complete';
1155 } else {
1156   $final_state = 'Failed';
1157 }
1158 $Job->update_attributes('state' => $final_state);
1159
1160 exit (($final_state eq 'Complete') ? 0 : 1);
1161
1162
1163
1164 sub update_progress_stats
1165 {
1166   $progress_stats_updated = time;
1167   return if !$progress_is_dirty;
1168   my ($todo, $done, $running) = (scalar @jobstep_todo,
1169                                  scalar @jobstep_done,
1170                                  scalar keys(%proc));
1171   $Job->{'tasks_summary'} ||= {};
1172   $Job->{'tasks_summary'}->{'todo'} = $todo;
1173   $Job->{'tasks_summary'}->{'done'} = $done;
1174   $Job->{'tasks_summary'}->{'running'} = $running;
1175   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1176   Log (undef, "status: $done done, $running running, $todo todo");
1177   $progress_is_dirty = 0;
1178 }
1179
1180
1181
1182 sub reapchildren
1183 {
1184   my $pid = waitpid (-1, WNOHANG);
1185   return 0 if $pid <= 0;
1186
1187   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1188                   . "."
1189                   . $slot[$proc{$pid}->{slot}]->{cpu});
1190   my $jobstepid = $proc{$pid}->{jobstep};
1191   my $elapsed = time - $proc{$pid}->{time};
1192   my $Jobstep = $jobstep[$jobstepid];
1193
1194   my $childstatus = $?;
1195   my $exitvalue = $childstatus >> 8;
1196   my $exitinfo = "exit ".exit_status_s($childstatus);
1197   $Jobstep->{'arvados_task'}->reload;
1198   my $task_success = $Jobstep->{'arvados_task'}->{success};
1199
1200   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1201
1202   if (!defined $task_success) {
1203     # task did not indicate one way or the other --> fail
1204     Log($jobstepid, sprintf(
1205           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1206           exit_status_s($childstatus)));
1207     $Jobstep->{'arvados_task'}->{success} = 0;
1208     $Jobstep->{'arvados_task'}->save;
1209     $task_success = 0;
1210   }
1211
1212   if (!$task_success)
1213   {
1214     my $temporary_fail;
1215     $temporary_fail ||= $Jobstep->{tempfail};
1216     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1217
1218     ++$thisround_failed;
1219     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1220
1221     # Check for signs of a failed or misconfigured node
1222     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1223         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1224       # Don't count this against jobstep failure thresholds if this
1225       # node is already suspected faulty and srun exited quickly
1226       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1227           $elapsed < 5) {
1228         Log ($jobstepid, "blaming failure on suspect node " .
1229              $slot[$proc{$pid}->{slot}]->{node}->{name});
1230         $temporary_fail ||= 1;
1231       }
1232       ban_node_by_slot($proc{$pid}->{slot});
1233     }
1234
1235     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1236                              ++$Jobstep->{'failures'},
1237                              $temporary_fail ? 'temporary' : 'permanent',
1238                              $elapsed));
1239
1240     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1241       # Give up on this task, and the whole job
1242       $main::success = 0;
1243     }
1244     # Put this task back on the todo queue
1245     push @jobstep_todo, $jobstepid;
1246     $Job->{'tasks_summary'}->{'failed'}++;
1247   }
1248   else
1249   {
1250     ++$thisround_succeeded;
1251     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1252     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1253     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1254     push @jobstep_done, $jobstepid;
1255     Log ($jobstepid, "success in $elapsed seconds");
1256   }
1257   $Jobstep->{exitcode} = $childstatus;
1258   $Jobstep->{finishtime} = time;
1259   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1260   $Jobstep->{'arvados_task'}->save;
1261   process_stderr ($jobstepid, $task_success);
1262   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1263                            length($Jobstep->{'arvados_task'}->{output}),
1264                            $Jobstep->{'arvados_task'}->{output}));
1265
1266   close $reader{$jobstepid};
1267   delete $reader{$jobstepid};
1268   delete $slot[$proc{$pid}->{slot}]->{pid};
1269   push @freeslot, $proc{$pid}->{slot};
1270   delete $proc{$pid};
1271
1272   if ($task_success) {
1273     # Load new tasks
1274     my $newtask_list = [];
1275     my $newtask_results;
1276     do {
1277       $newtask_results = api_call(
1278         "job_tasks/list",
1279         'where' => {
1280           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1281         },
1282         'order' => 'qsequence',
1283         'offset' => scalar(@$newtask_list),
1284       );
1285       push(@$newtask_list, @{$newtask_results->{items}});
1286     } while (@{$newtask_results->{items}});
1287     foreach my $arvados_task (@$newtask_list) {
1288       my $jobstep = {
1289         'level' => $arvados_task->{'sequence'},
1290         'failures' => 0,
1291         'arvados_task' => $arvados_task
1292       };
1293       push @jobstep, $jobstep;
1294       push @jobstep_todo, $#jobstep;
1295     }
1296   }
1297
1298   $progress_is_dirty = 1;
1299   1;
1300 }
1301
1302 sub check_refresh_wanted
1303 {
1304   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1305   if (@stat && $stat[9] > $latest_refresh) {
1306     $latest_refresh = scalar time;
1307     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1308     for my $attr ('cancelled_at',
1309                   'cancelled_by_user_uuid',
1310                   'cancelled_by_client_uuid',
1311                   'state') {
1312       $Job->{$attr} = $Job2->{$attr};
1313     }
1314     if ($Job->{'state'} ne "Running") {
1315       if ($Job->{'state'} eq "Cancelled") {
1316         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1317       } else {
1318         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1319       }
1320       $main::success = 0;
1321       $main::please_freeze = 1;
1322     }
1323   }
1324 }
1325
1326 sub check_squeue
1327 {
1328   my $last_squeue_check = $squeue_checked;
1329
1330   # Do not call `squeue` or check the kill list more than once every
1331   # 15 seconds.
1332   return if $last_squeue_check > time - 15;
1333   $squeue_checked = time;
1334
1335   # Look for children from which we haven't received stderr data since
1336   # the last squeue check. If no such children exist, all procs are
1337   # alive and there's no need to even look at squeue.
1338   #
1339   # As long as the crunchstat poll interval (10s) is shorter than the
1340   # squeue check interval (15s) this should make the squeue check an
1341   # infrequent event.
1342   my $silent_procs = 0;
1343   for my $jobstep (values %proc)
1344   {
1345     if ($jobstep->{stderr_at} < $last_squeue_check)
1346     {
1347       $silent_procs++;
1348     }
1349   }
1350   return if $silent_procs == 0;
1351
1352   # use killem() on procs whose killtime is reached
1353   while (my ($pid, $jobstep) = each %proc)
1354   {
1355     if (exists $jobstep->{killtime}
1356         && $jobstep->{killtime} <= time
1357         && $jobstep->{stderr_at} < $last_squeue_check)
1358     {
1359       my $sincewhen = "";
1360       if ($jobstep->{stderr_at}) {
1361         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1362       }
1363       Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1364       killem ($pid);
1365     }
1366   }
1367
1368   if (!$have_slurm)
1369   {
1370     # here is an opportunity to check for mysterious problems with local procs
1371     return;
1372   }
1373
1374   # Get a list of steps still running.  Note: squeue(1) says --steps
1375   # selects a format (which we override anyway) and allows us to
1376   # specify which steps we're interested in (which we don't).
1377   # Importantly, it also changes the meaning of %j from "job name" to
1378   # "step name" and (although this isn't mentioned explicitly in the
1379   # docs) switches from "one line per job" mode to "one line per step"
1380   # mode. Without it, we'd just get a list of one job, instead of a
1381   # list of N steps.
1382   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1383   if ($? != 0)
1384   {
1385     Log(undef, "warning: squeue exit status $? ($!)");
1386     return;
1387   }
1388   chop @squeue;
1389
1390   # which of my jobsteps are running, according to squeue?
1391   my %ok;
1392   for my $jobstepname (@squeue)
1393   {
1394     $ok{$jobstepname} = 1;
1395   }
1396
1397   # Check for child procs >60s old and not mentioned by squeue.
1398   while (my ($pid, $jobstep) = each %proc)
1399   {
1400     if ($jobstep->{time} < time - 60
1401         && $jobstep->{jobstepname}
1402         && !exists $ok{$jobstep->{jobstepname}}
1403         && !exists $jobstep->{killtime})
1404     {
1405       # According to slurm, this task has ended (successfully or not)
1406       # -- but our srun child hasn't exited. First we must wait (30
1407       # seconds) in case this is just a race between communication
1408       # channels. Then, if our srun child process still hasn't
1409       # terminated, we'll conclude some slurm communication
1410       # error/delay has caused the task to die without notifying srun,
1411       # and we'll kill srun ourselves.
1412       $jobstep->{killtime} = time + 30;
1413       Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1414     }
1415   }
1416 }
1417
1418
1419 sub release_allocation
1420 {
1421   if ($have_slurm)
1422   {
1423     Log (undef, "release job allocation");
1424     system "scancel $ENV{SLURM_JOB_ID}";
1425   }
1426 }
1427
1428
1429 sub readfrompipes
1430 {
1431   my $gotsome = 0;
1432   foreach my $job (keys %reader)
1433   {
1434     my $buf;
1435     while (0 < sysread ($reader{$job}, $buf, 8192))
1436     {
1437       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1438       $jobstep[$job]->{stderr_at} = time;
1439       $jobstep[$job]->{stderr} .= $buf;
1440       preprocess_stderr ($job);
1441       if (length ($jobstep[$job]->{stderr}) > 16384)
1442       {
1443         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1444       }
1445       $gotsome = 1;
1446     }
1447   }
1448   return $gotsome;
1449 }
1450
1451
1452 sub preprocess_stderr
1453 {
1454   my $job = shift;
1455
1456   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1457     my $line = $1;
1458     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1459     Log ($job, "stderr $line");
1460     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1461       # whoa.
1462       $main::please_freeze = 1;
1463     }
1464     elsif ($line =~ /srun: error: Node failure on/) {
1465       my $job_slot_index = $jobstep[$job]->{slotindex};
1466       $slot[$job_slot_index]->{node}->{fail_count}++;
1467       $jobstep[$job]->{tempfail} = 1;
1468       ban_node_by_slot($job_slot_index);
1469     }
1470     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1471       $jobstep[$job]->{tempfail} = 1;
1472       ban_node_by_slot($jobstep[$job]->{slotindex});
1473     }
1474     elsif ($line =~ /arvados\.errors\.Keep/) {
1475       $jobstep[$job]->{tempfail} = 1;
1476     }
1477   }
1478 }
1479
1480
1481 sub process_stderr
1482 {
1483   my $job = shift;
1484   my $task_success = shift;
1485   preprocess_stderr ($job);
1486
1487   map {
1488     Log ($job, "stderr $_");
1489   } split ("\n", $jobstep[$job]->{stderr});
1490 }
1491
1492 sub fetch_block
1493 {
1494   my $hash = shift;
1495   my $keep;
1496   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1497     Log(undef, "fetch_block run error from arv-get $hash: $!");
1498     return undef;
1499   }
1500   my $output_block = "";
1501   while (1) {
1502     my $buf;
1503     my $bytes = sysread($keep, $buf, 1024 * 1024);
1504     if (!defined $bytes) {
1505       Log(undef, "fetch_block read error from arv-get: $!");
1506       $output_block = undef;
1507       last;
1508     } elsif ($bytes == 0) {
1509       # sysread returns 0 at the end of the pipe.
1510       last;
1511     } else {
1512       # some bytes were read into buf.
1513       $output_block .= $buf;
1514     }
1515   }
1516   close $keep;
1517   if ($?) {
1518     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1519     $output_block = undef;
1520   }
1521   return $output_block;
1522 }
1523
1524 # Create a collection by concatenating the output of all tasks (each
1525 # task's output is either a manifest fragment, a locator for a
1526 # manifest fragment stored in Keep, or nothing at all). Return the
1527 # portable_data_hash of the new collection.
1528 sub create_output_collection
1529 {
1530   Log (undef, "collate");
1531
1532   my ($child_out, $child_in);
1533   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1534 import arvados
1535 import sys
1536 print (arvados.api("v1").collections().
1537        create(body={"manifest_text": sys.stdin.read()}).
1538        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1539 }, retry_count());
1540
1541   my $task_idx = -1;
1542   my $manifest_size = 0;
1543   for (@jobstep)
1544   {
1545     ++$task_idx;
1546     my $output = $_->{'arvados_task'}->{output};
1547     next if (!defined($output));
1548     my $next_write;
1549     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1550       $next_write = fetch_block($output);
1551     } else {
1552       $next_write = $output;
1553     }
1554     if (defined($next_write)) {
1555       if (!defined(syswrite($child_in, $next_write))) {
1556         # There's been an error writing.  Stop the loop.
1557         # We'll log details about the exit code later.
1558         last;
1559       } else {
1560         $manifest_size += length($next_write);
1561       }
1562     } else {
1563       my $uuid = $_->{'arvados_task'}->{'uuid'};
1564       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1565       $main::success = 0;
1566     }
1567   }
1568   close($child_in);
1569   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1570
1571   my $joboutput;
1572   my $s = IO::Select->new($child_out);
1573   if ($s->can_read(120)) {
1574     sysread($child_out, $joboutput, 1024 * 1024);
1575     waitpid($pid, 0);
1576     if ($?) {
1577       Log(undef, "output collection creation exited " . exit_status_s($?));
1578       $joboutput = undef;
1579     } else {
1580       chomp($joboutput);
1581     }
1582   } else {
1583     Log (undef, "timed out while creating output collection");
1584     foreach my $signal (2, 2, 2, 15, 15, 9) {
1585       kill($signal, $pid);
1586       last if waitpid($pid, WNOHANG) == -1;
1587       sleep(1);
1588     }
1589   }
1590   close($child_out);
1591
1592   return $joboutput;
1593 }
1594
1595 # Calls create_output_collection, logs the result, and returns it.
1596 # If that was successful, save that as the output in the job record.
1597 sub save_output_collection {
1598   my $collated_output = create_output_collection();
1599
1600   if (!$collated_output) {
1601     Log(undef, "Failed to write output collection");
1602   }
1603   else {
1604     Log(undef, "job output $collated_output");
1605     $Job->update_attributes('output' => $collated_output);
1606   }
1607   return $collated_output;
1608 }
1609
1610 sub killem
1611 {
1612   foreach (@_)
1613   {
1614     my $sig = 2;                # SIGINT first
1615     if (exists $proc{$_}->{"sent_$sig"} &&
1616         time - $proc{$_}->{"sent_$sig"} > 4)
1617     {
1618       $sig = 15;                # SIGTERM if SIGINT doesn't work
1619     }
1620     if (exists $proc{$_}->{"sent_$sig"} &&
1621         time - $proc{$_}->{"sent_$sig"} > 4)
1622     {
1623       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1624     }
1625     if (!exists $proc{$_}->{"sent_$sig"})
1626     {
1627       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1628       kill $sig, $_;
1629       select (undef, undef, undef, 0.1);
1630       if ($sig == 2)
1631       {
1632         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1633       }
1634       $proc{$_}->{"sent_$sig"} = time;
1635       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1636     }
1637   }
1638 }
1639
1640
1641 sub fhbits
1642 {
1643   my($bits);
1644   for (@_) {
1645     vec($bits,fileno($_),1) = 1;
1646   }
1647   $bits;
1648 }
1649
1650
1651 # Send log output to Keep via arv-put.
1652 #
1653 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1654 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1655 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1656 # $log_pipe_pid is the pid of the arv-put subprocess.
1657 #
1658 # The only functions that should access these variables directly are:
1659 #
1660 # log_writer_start($logfilename)
1661 #     Starts an arv-put pipe, reading data on stdin and writing it to
1662 #     a $logfilename file in an output collection.
1663 #
1664 # log_writer_read_output([$timeout])
1665 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1666 #     Passes $timeout to the select() call, with a default of 0.01.
1667 #     Returns the result of the last read() call on $log_pipe_out, or
1668 #     -1 if read() wasn't called because select() timed out.
1669 #     Only other log_writer_* functions should need to call this.
1670 #
1671 # log_writer_send($txt)
1672 #     Writes $txt to the output log collection.
1673 #
1674 # log_writer_finish()
1675 #     Closes the arv-put pipe and returns the output that it produces.
1676 #
1677 # log_writer_is_active()
1678 #     Returns a true value if there is currently a live arv-put
1679 #     process, false otherwise.
1680 #
1681 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1682     $log_pipe_pid);
1683
1684 sub log_writer_start($)
1685 {
1686   my $logfilename = shift;
1687   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1688                         'arv-put',
1689                         '--stream',
1690                         '--retries', '3',
1691                         '--filename', $logfilename,
1692                         '-');
1693   $log_pipe_out_buf = "";
1694   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1695 }
1696
1697 sub log_writer_read_output {
1698   my $timeout = shift || 0.01;
1699   my $read = -1;
1700   while ($read && $log_pipe_out_select->can_read($timeout)) {
1701     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1702                  length($log_pipe_out_buf));
1703   }
1704   if (!defined($read)) {
1705     Log(undef, "error reading log manifest from arv-put: $!");
1706   }
1707   return $read;
1708 }
1709
1710 sub log_writer_send($)
1711 {
1712   my $txt = shift;
1713   print $log_pipe_in $txt;
1714   log_writer_read_output();
1715 }
1716
1717 sub log_writer_finish()
1718 {
1719   return unless $log_pipe_pid;
1720
1721   close($log_pipe_in);
1722
1723   my $logger_failed = 0;
1724   my $read_result = log_writer_read_output(120);
1725   if ($read_result == -1) {
1726     $logger_failed = -1;
1727     Log (undef, "timed out reading from 'arv-put'");
1728   } elsif ($read_result != 0) {
1729     $logger_failed = -2;
1730     Log(undef, "failed to read arv-put log manifest to EOF");
1731   }
1732
1733   waitpid($log_pipe_pid, 0);
1734   if ($?) {
1735     $logger_failed ||= $?;
1736     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1737   }
1738
1739   close($log_pipe_out);
1740   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1741   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1742       $log_pipe_out_select = undef;
1743
1744   return $arv_put_output;
1745 }
1746
1747 sub log_writer_is_active() {
1748   return $log_pipe_pid;
1749 }
1750
1751 sub Log                         # ($jobstep_id, $logmessage)
1752 {
1753   if ($_[1] =~ /\n/) {
1754     for my $line (split (/\n/, $_[1])) {
1755       Log ($_[0], $line);
1756     }
1757     return;
1758   }
1759   my $fh = select STDERR; $|=1; select $fh;
1760   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1761   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1762   $message .= "\n";
1763   my $datetime;
1764   if (log_writer_is_active() || -t STDERR) {
1765     my @gmtime = gmtime;
1766     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1767                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1768   }
1769   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1770
1771   if (log_writer_is_active()) {
1772     log_writer_send($datetime . " " . $message);
1773   }
1774 }
1775
1776
1777 sub croak
1778 {
1779   my ($package, $file, $line) = caller;
1780   my $message = "@_ at $file line $line\n";
1781   Log (undef, $message);
1782   freeze() if @jobstep_todo;
1783   create_output_collection() if @jobstep_todo;
1784   cleanup();
1785   save_meta();
1786   die;
1787 }
1788
1789
1790 sub cleanup
1791 {
1792   return unless $Job;
1793   if ($Job->{'state'} eq 'Cancelled') {
1794     $Job->update_attributes('finished_at' => scalar gmtime);
1795   } else {
1796     $Job->update_attributes('state' => 'Failed');
1797   }
1798 }
1799
1800
1801 sub save_meta
1802 {
1803   my $justcheckpoint = shift; # false if this will be the last meta saved
1804   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1805   return unless log_writer_is_active();
1806   my $log_manifest = log_writer_finish();
1807   return unless defined($log_manifest);
1808
1809   if ($Job->{log}) {
1810     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1811     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1812   }
1813
1814   my $log_coll = api_call(
1815     "collections/create", ensure_unique_name => 1, collection => {
1816       manifest_text => $log_manifest,
1817       owner_uuid => $Job->{owner_uuid},
1818       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1819     });
1820   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1821   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1822 }
1823
1824
1825 sub freeze_if_want_freeze
1826 {
1827   if ($main::please_freeze)
1828   {
1829     release_allocation();
1830     if (@_)
1831     {
1832       # kill some srun procs before freeze+stop
1833       map { $proc{$_} = {} } @_;
1834       while (%proc)
1835       {
1836         killem (keys %proc);
1837         select (undef, undef, undef, 0.1);
1838         my $died;
1839         while (($died = waitpid (-1, WNOHANG)) > 0)
1840         {
1841           delete $proc{$died};
1842         }
1843       }
1844     }
1845     freeze();
1846     create_output_collection();
1847     cleanup();
1848     save_meta();
1849     exit 1;
1850   }
1851 }
1852
1853
1854 sub freeze
1855 {
1856   Log (undef, "Freeze not implemented");
1857   return;
1858 }
1859
1860
1861 sub thaw
1862 {
1863   croak ("Thaw not implemented");
1864 }
1865
1866
1867 sub freezequote
1868 {
1869   my $s = shift;
1870   $s =~ s/\\/\\\\/g;
1871   $s =~ s/\n/\\n/g;
1872   return $s;
1873 }
1874
1875
1876 sub freezeunquote
1877 {
1878   my $s = shift;
1879   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1880   return $s;
1881 }
1882
1883
1884 sub srun
1885 {
1886   my $srunargs = shift;
1887   my $execargs = shift;
1888   my $opts = shift || {};
1889   my $stdin = shift;
1890   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1891
1892   $Data::Dumper::Terse = 1;
1893   $Data::Dumper::Indent = 0;
1894   my $show_cmd = Dumper($args);
1895   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1896   $show_cmd =~ s/\n/ /g;
1897   if ($opts->{fork}) {
1898     Log(undef, "starting: $show_cmd");
1899   } else {
1900     # This is a child process: parent is in charge of reading our
1901     # stderr and copying it to Log() if needed.
1902     warn "starting: $show_cmd\n";
1903   }
1904
1905   if (defined $stdin) {
1906     my $child = open STDIN, "-|";
1907     defined $child or die "no fork: $!";
1908     if ($child == 0) {
1909       print $stdin or die $!;
1910       close STDOUT or die $!;
1911       exit 0;
1912     }
1913   }
1914
1915   return system (@$args) if $opts->{fork};
1916
1917   exec @$args;
1918   warn "ENV size is ".length(join(" ",%ENV));
1919   die "exec failed: $!: @$args";
1920 }
1921
1922
1923 sub ban_node_by_slot {
1924   # Don't start any new jobsteps on this node for 60 seconds
1925   my $slotid = shift;
1926   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1927   $slot[$slotid]->{node}->{hold_count}++;
1928   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1929 }
1930
1931 sub must_lock_now
1932 {
1933   my ($lockfile, $error_message) = @_;
1934   open L, ">", $lockfile or croak("$lockfile: $!");
1935   if (!flock L, LOCK_EX|LOCK_NB) {
1936     croak("Can't lock $lockfile: $error_message\n");
1937   }
1938 }
1939
1940 sub find_docker_image {
1941   # Given a Keep locator, check to see if it contains a Docker image.
1942   # If so, return its stream name and Docker hash.
1943   # If not, return undef for both values.
1944   my $locator = shift;
1945   my ($streamname, $filename);
1946   my $image = api_call("collections/get", uuid => $locator);
1947   if ($image) {
1948     foreach my $line (split(/\n/, $image->{manifest_text})) {
1949       my @tokens = split(/\s+/, $line);
1950       next if (!@tokens);
1951       $streamname = shift(@tokens);
1952       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1953         if (defined($filename)) {
1954           return (undef, undef);  # More than one file in the Collection.
1955         } else {
1956           $filename = (split(/:/, $filedata, 3))[2];
1957         }
1958       }
1959     }
1960   }
1961   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1962     return ($streamname, $1);
1963   } else {
1964     return (undef, undef);
1965   }
1966 }
1967
1968 sub retry_count {
1969   # Calculate the number of times an operation should be retried,
1970   # assuming exponential backoff, and that we're willing to retry as
1971   # long as tasks have been running.  Enforce a minimum of 3 retries.
1972   my ($starttime, $endtime, $timediff, $retries);
1973   if (@jobstep) {
1974     $starttime = $jobstep[0]->{starttime};
1975     $endtime = $jobstep[-1]->{finishtime};
1976   }
1977   if (!defined($starttime)) {
1978     $timediff = 0;
1979   } elsif (!defined($endtime)) {
1980     $timediff = time - $starttime;
1981   } else {
1982     $timediff = ($endtime - $starttime) - (time - $endtime);
1983   }
1984   if ($timediff > 0) {
1985     $retries = int(log($timediff) / log(2));
1986   } else {
1987     $retries = 1;  # Use the minimum.
1988   }
1989   return ($retries > 3) ? $retries : 3;
1990 }
1991
1992 sub retry_op {
1993   # Pass in two function references.
1994   # This method will be called with the remaining arguments.
1995   # If it dies, retry it with exponential backoff until it succeeds,
1996   # or until the current retry_count is exhausted.  After each failure
1997   # that can be retried, the second function will be called with
1998   # the current try count (0-based), next try time, and error message.
1999   my $operation = shift;
2000   my $retry_callback = shift;
2001   my $retries = retry_count();
2002   foreach my $try_count (0..$retries) {
2003     my $next_try = time + (2 ** $try_count);
2004     my $result = eval { $operation->(@_); };
2005     if (!$@) {
2006       return $result;
2007     } elsif ($try_count < $retries) {
2008       $retry_callback->($try_count, $next_try, $@);
2009       my $sleep_time = $next_try - time;
2010       sleep($sleep_time) if ($sleep_time > 0);
2011     }
2012   }
2013   # Ensure the error message ends in a newline, so Perl doesn't add
2014   # retry_op's line number to it.
2015   chomp($@);
2016   die($@ . "\n");
2017 }
2018
2019 sub api_call {
2020   # Pass in a /-separated API method name, and arguments for it.
2021   # This function will call that method, retrying as needed until
2022   # the current retry_count is exhausted, with a log on the first failure.
2023   my $method_name = shift;
2024   my $log_api_retry = sub {
2025     my ($try_count, $next_try_at, $errmsg) = @_;
2026     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2027     $errmsg =~ s/\s/ /g;
2028     $errmsg =~ s/\s+$//;
2029     my $retry_msg;
2030     if ($next_try_at < time) {
2031       $retry_msg = "Retrying.";
2032     } else {
2033       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2034       $retry_msg = "Retrying at $next_try_fmt.";
2035     }
2036     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2037   };
2038   my $method = $arv;
2039   foreach my $key (split(/\//, $method_name)) {
2040     $method = $method->{$key};
2041   }
2042   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2043 }
2044
2045 sub exit_status_s {
2046   # Given a $?, return a human-readable exit code string like "0" or
2047   # "1" or "0 with signal 1" or "1 with signal 11".
2048   my $exitcode = shift;
2049   my $s = $exitcode >> 8;
2050   if ($exitcode & 0x7f) {
2051     $s .= " with signal " . ($exitcode & 0x7f);
2052   }
2053   if ($exitcode & 0x80) {
2054     $s .= " with core dump";
2055   }
2056   return $s;
2057 }
2058
2059 sub handle_readall {
2060   # Pass in a glob reference to a file handle.
2061   # Read all its contents and return them as a string.
2062   my $fh_glob_ref = shift;
2063   local $/ = undef;
2064   return <$fh_glob_ref>;
2065 }
2066
2067 sub tar_filename_n {
2068   my $n = shift;
2069   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2070 }
2071
2072 sub add_git_archive {
2073   # Pass in a git archive command as a string or list, a la system().
2074   # This method will save its output to be included in the archive sent to the
2075   # build script.
2076   my $git_input;
2077   $git_tar_count++;
2078   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2079     croak("Failed to save git archive: $!");
2080   }
2081   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2082   close($git_input);
2083   waitpid($git_pid, 0);
2084   close(GIT_ARCHIVE);
2085   if ($?) {
2086     croak("Failed to save git archive: git exited " . exit_status_s($?));
2087   }
2088 }
2089
2090 sub combined_git_archive {
2091   # Combine all saved tar archives into a single archive, then return its
2092   # contents in a string.  Return undef if no archives have been saved.
2093   if ($git_tar_count < 1) {
2094     return undef;
2095   }
2096   my $base_tar_name = tar_filename_n(1);
2097   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2098     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2099     if ($tar_exit != 0) {
2100       croak("Error preparing build archive: tar -A exited " .
2101             exit_status_s($tar_exit));
2102     }
2103   }
2104   if (!open(GIT_TAR, "<", $base_tar_name)) {
2105     croak("Could not open build archive: $!");
2106   }
2107   my $tar_contents = handle_readall(\*GIT_TAR);
2108   close(GIT_TAR);
2109   return $tar_contents;
2110 }
2111
2112 sub set_nonblocking {
2113   my $fh = shift;
2114   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2115   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2116 }
2117
2118 __DATA__
2119 #!/usr/bin/env perl
2120 #
2121 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2122 # server invokes this script on individual compute nodes, or localhost if we're
2123 # running a job locally.  It gets called in two modes:
2124 #
2125 # * No arguments: Installation mode.  Read a tar archive from the DATA
2126 #   file handle; it includes the Crunch script's source code, and
2127 #   maybe SDKs as well.  Those should be installed in the proper
2128 #   locations.  This runs outside of any Docker container, so don't try to
2129 #   introspect Crunch's runtime environment.
2130 #
2131 # * With arguments: Crunch script run mode.  This script should set up the
2132 #   environment, then run the command specified in the arguments.  This runs
2133 #   inside any Docker container.
2134
2135 use Fcntl ':flock';
2136 use File::Path qw( make_path remove_tree );
2137 use POSIX qw(getcwd);
2138
2139 use constant TASK_TEMPFAIL => 111;
2140
2141 # Map SDK subdirectories to the path environments they belong to.
2142 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2143
2144 my $destdir = $ENV{"CRUNCH_SRC"};
2145 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2146 my $repo = $ENV{"CRUNCH_SRC_URL"};
2147 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2148 my $job_work = $ENV{"JOB_WORK"};
2149 my $task_work = $ENV{"TASK_WORK"};
2150
2151 open(STDOUT_ORIG, ">&", STDOUT);
2152 open(STDERR_ORIG, ">&", STDERR);
2153
2154 for my $dir ($destdir, $job_work, $task_work) {
2155   if ($dir) {
2156     make_path $dir;
2157     -e $dir or die "Failed to create temporary directory ($dir): $!";
2158   }
2159 }
2160
2161 if ($task_work) {
2162   remove_tree($task_work, {keep_root => 1});
2163 }
2164
2165 ### Crunch script run mode
2166 if (@ARGV) {
2167   # We want to do routine logging during task 0 only.  This gives the user
2168   # the information they need, but avoids repeating the information for every
2169   # task.
2170   my $Log;
2171   if ($ENV{TASK_SEQUENCE} eq "0") {
2172     $Log = sub {
2173       my $msg = shift;
2174       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2175     };
2176   } else {
2177     $Log = sub { };
2178   }
2179
2180   my $python_src = "$install_dir/python";
2181   my $venv_dir = "$job_work/.arvados.venv";
2182   my $venv_built = -e "$venv_dir/bin/activate";
2183   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2184     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2185                  "--python=python2.7", $venv_dir);
2186     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2187     $venv_built = 1;
2188     $Log->("Built Python SDK virtualenv");
2189   }
2190
2191   my @pysdk_version_cmd = ("python", "-c",
2192     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2193   if ($venv_built) {
2194     $Log->("Running in Python SDK virtualenv");
2195     @pysdk_version_cmd = ();
2196     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2197     @ARGV = ("/bin/sh", "-ec",
2198              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2199   } elsif (-d $python_src) {
2200     $Log->("Warning: virtualenv not found inside Docker container default " .
2201            "\$PATH. Can't install Python SDK.");
2202   }
2203
2204   if (@pysdk_version_cmd) {
2205     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2206     my $pysdk_version = <$pysdk_version_pipe>;
2207     close($pysdk_version_pipe);
2208     if ($? == 0) {
2209       chomp($pysdk_version);
2210       $Log->("Using Arvados SDK version $pysdk_version");
2211     } else {
2212       # A lot could've gone wrong here, but pretty much all of it means that
2213       # Python won't be able to load the Arvados SDK.
2214       $Log->("Warning: Arvados SDK not found");
2215     }
2216   }
2217
2218   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2219     my $sdk_path = "$install_dir/$sdk_dir";
2220     if (-d $sdk_path) {
2221       if ($ENV{$sdk_envkey}) {
2222         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2223       } else {
2224         $ENV{$sdk_envkey} = $sdk_path;
2225       }
2226       $Log->("Arvados SDK added to %s", $sdk_envkey);
2227     }
2228   }
2229
2230   exec(@ARGV);
2231   die "Cannot exec `@ARGV`: $!";
2232 }
2233
2234 ### Installation mode
2235 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2236 flock L, LOCK_EX;
2237 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2238   # This exact git archive (source + arvados sdk) is already installed
2239   # here, so there's no need to reinstall it.
2240
2241   # We must consume our DATA section, though: otherwise the process
2242   # feeding it to us will get SIGPIPE.
2243   my $buf;
2244   while (read(DATA, $buf, 65536)) { }
2245
2246   exit(0);
2247 }
2248
2249 unlink "$destdir.archive_hash";
2250 mkdir $destdir;
2251
2252 do {
2253   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2254   local $SIG{PIPE} = "IGNORE";
2255   warn "Extracting archive: $archive_hash\n";
2256   # --ignore-zeros is necessary sometimes: depending on how much NUL
2257   # padding tar -A put on our combined archive (which in turn depends
2258   # on the length of the component archives) tar without
2259   # --ignore-zeros will exit before consuming stdin and cause close()
2260   # to fail on the resulting SIGPIPE.
2261   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2262     die "Error launching 'tar -xC $destdir': $!";
2263   }
2264   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2265   # get SIGPIPE.  We must feed it data incrementally.
2266   my $tar_input;
2267   while (read(DATA, $tar_input, 65536)) {
2268     print TARX $tar_input;
2269   }
2270   if(!close(TARX)) {
2271     die "'tar -xC $destdir' exited $?: $!";
2272   }
2273 };
2274
2275 mkdir $install_dir;
2276
2277 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2278 if (-d $sdk_root) {
2279   foreach my $sdk_lang (("python",
2280                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2281     if (-d "$sdk_root/$sdk_lang") {
2282       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2283         die "Failed to install $sdk_lang SDK: $!";
2284       }
2285     }
2286   }
2287 }
2288
2289 my $python_dir = "$install_dir/python";
2290 if ((-d $python_dir) and can_run("python2.7")) {
2291   open(my $egg_info_pipe, "-|",
2292        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2293   my @egg_info_errors = <$egg_info_pipe>;
2294   close($egg_info_pipe);
2295
2296   if ($?) {
2297     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2298       # egg_info apparently failed because it couldn't ask git for a build tag.
2299       # Specify no build tag.
2300       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2301       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2302       close($pysdk_cfg);
2303     } else {
2304       my $egg_info_exit = $? >> 8;
2305       foreach my $errline (@egg_info_errors) {
2306         warn $errline;
2307       }
2308       warn "python setup.py egg_info failed: exit $egg_info_exit";
2309       exit ($egg_info_exit || 1);
2310     }
2311   }
2312 }
2313
2314 # Hide messages from the install script (unless it fails: shell_or_die
2315 # will show $destdir.log in that case).
2316 open(STDOUT, ">>", "$destdir.log");
2317 open(STDERR, ">&", STDOUT);
2318
2319 if (-e "$destdir/crunch_scripts/install") {
2320     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2321 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2322     # Old version
2323     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2324 } elsif (-e "./install.sh") {
2325     shell_or_die (undef, "./install.sh", $install_dir);
2326 }
2327
2328 if ($archive_hash) {
2329     unlink "$destdir.archive_hash.new";
2330     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2331     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2332 }
2333
2334 close L;
2335
2336 sub can_run {
2337   my $command_name = shift;
2338   open(my $which, "-|", "which", $command_name);
2339   while (<$which>) { }
2340   close($which);
2341   return ($? == 0);
2342 }
2343
2344 sub shell_or_die
2345 {
2346   my $exitcode = shift;
2347
2348   if ($ENV{"DEBUG"}) {
2349     print STDERR "@_\n";
2350   }
2351   if (system (@_) != 0) {
2352     my $err = $!;
2353     my $code = $?;
2354     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2355     open STDERR, ">&STDERR_ORIG";
2356     system ("cat $destdir.log >&2");
2357     warn "@_ failed ($err): $exitstatus";
2358     if (defined($exitcode)) {
2359       exit $exitcode;
2360     }
2361     else {
2362       exit (($code >> 8) || 1);
2363     }
2364   }
2365 }
2366
2367 __DATA__