8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi...
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 my ($exited, $stdout, $stderr) = srun_sync(
187   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
188   $cmd,
189   {label => "sanity check"});
190 if ($exited != 0) {
191   Log(undef, "Sanity check failed: ".exit_status_s($exited));
192   exit EX_TEMPFAIL;
193 }
194 Log(undef, "Sanity check OK");
195
196
197 my $User = api_call("users/current");
198
199 if (!$local_job) {
200   if (!$force_unlock) {
201     # Claim this job, and make sure nobody else does
202     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
203     if ($@) {
204       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
205       exit EX_TEMPFAIL;
206     };
207   }
208 }
209 else
210 {
211   if (!$resume_stash)
212   {
213     map { croak ("No $_ specified") unless $local_job->{$_} }
214     qw(script script_version script_parameters);
215   }
216
217   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
218   $local_job->{'started_at'} = gmtime;
219   $local_job->{'state'} = 'Running';
220
221   $Job = api_call("jobs/create", job => $local_job);
222 }
223 $job_id = $Job->{'uuid'};
224
225 my $keep_logfile = $job_id . '.log.txt';
226 log_writer_start($keep_logfile);
227
228 $Job->{'runtime_constraints'} ||= {};
229 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
230 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
231
232 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
233 if ($? == 0) {
234   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
235   chomp($gem_versions);
236   chop($gem_versions);  # Closing parentheses
237 } else {
238   $gem_versions = "";
239 }
240 Log(undef,
241     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
242
243 Log (undef, "check slurm allocation");
244 my @slot;
245 my @node;
246 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
247 my @sinfo;
248 if (!$have_slurm)
249 {
250   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
251   push @sinfo, "$localcpus localhost";
252 }
253 if (exists $ENV{SLURM_NODELIST})
254 {
255   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
256 }
257 foreach (@sinfo)
258 {
259   my ($ncpus, $slurm_nodelist) = split;
260   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
261
262   my @nodelist;
263   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
264   {
265     my $nodelist = $1;
266     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
267     {
268       my $ranges = $1;
269       foreach (split (",", $ranges))
270       {
271         my ($a, $b);
272         if (/(\d+)-(\d+)/)
273         {
274           $a = $1;
275           $b = $2;
276         }
277         else
278         {
279           $a = $_;
280           $b = $_;
281         }
282         push @nodelist, map {
283           my $n = $nodelist;
284           $n =~ s/\[[-,\d]+\]/$_/;
285           $n;
286         } ($a..$b);
287       }
288     }
289     else
290     {
291       push @nodelist, $nodelist;
292     }
293   }
294   foreach my $nodename (@nodelist)
295   {
296     Log (undef, "node $nodename - $ncpus slots");
297     my $node = { name => $nodename,
298                  ncpus => $ncpus,
299                  # The number of consecutive times a task has been dispatched
300                  # to this node and failed.
301                  losing_streak => 0,
302                  # The number of consecutive times that SLURM has reported
303                  # a node failure since the last successful task.
304                  fail_count => 0,
305                  # Don't dispatch work to this node until this time
306                  # (in seconds since the epoch) has passed.
307                  hold_until => 0 };
308     foreach my $cpu (1..$ncpus)
309     {
310       push @slot, { node => $node,
311                     cpu => $cpu };
312     }
313   }
314   push @node, @nodelist;
315 }
316
317
318
319 # Ensure that we get one jobstep running on each allocated node before
320 # we start overloading nodes with concurrent steps
321
322 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
323
324
325 $Job->update_attributes(
326   'tasks_summary' => { 'failed' => 0,
327                        'todo' => 1,
328                        'running' => 0,
329                        'done' => 0 });
330
331 Log (undef, "start");
332 $SIG{'INT'} = sub { $main::please_freeze = 1; };
333 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
334 $SIG{'TERM'} = \&croak;
335 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
336 $SIG{'ALRM'} = sub { $main::please_info = 1; };
337 $SIG{'CONT'} = sub { $main::please_continue = 1; };
338 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
339
340 $main::please_freeze = 0;
341 $main::please_info = 0;
342 $main::please_continue = 0;
343 $main::please_refresh = 0;
344 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
345
346 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
347 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
348 $ENV{"JOB_UUID"} = $job_id;
349
350
351 my @jobstep_todo = ();
352 my @jobstep_done = ();
353 my @jobstep_tomerge = ();
354 my $jobstep_tomerge_level = 0;
355 my $squeue_checked = 0;
356 my $latest_refresh = scalar time;
357
358
359
360 if (defined $Job->{thawedfromkey})
361 {
362   thaw ($Job->{thawedfromkey});
363 }
364 else
365 {
366   my $first_task = api_call("job_tasks/create", job_task => {
367     'job_uuid' => $Job->{'uuid'},
368     'sequence' => 0,
369     'qsequence' => 0,
370     'parameters' => {},
371   });
372   push @jobstep, { 'level' => 0,
373                    'failures' => 0,
374                    'arvados_task' => $first_task,
375                  };
376   push @jobstep_todo, 0;
377 }
378
379
380 if (!$have_slurm)
381 {
382   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
383 }
384
385 my $build_script = handle_readall(\*DATA);
386 my $nodelist = join(",", @node);
387 my $git_tar_count = 0;
388
389 if (!defined $no_clear_tmp) {
390   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
391   # up work directories crunch_tmp/work, crunch_tmp/opt,
392   # crunch_tmp/src*.
393   #
394   # TODO: When #5036 is done and widely deployed, we can limit mount's
395   # -t option to simply fuse.keep.
396   my ($exited, $stdout, $stderr) = srun_sync(
397     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
399     {label => "clean work dirs"});
400   if ($exited != 0) {
401     exit(EX_RETRY_UNLOCKED);
402   }
403 }
404
405 # If this job requires a Docker image, install that.
406 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
407 if ($docker_locator = $Job->{docker_image_locator}) {
408   Log (undef, "Install docker image $docker_locator");
409   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
410   if (!$docker_hash)
411   {
412     croak("No Docker image hash found from locator $docker_locator");
413   }
414   Log (undef, "docker image hash is $docker_hash");
415   $docker_stream =~ s/^\.//;
416   my $docker_install_script = qq{
417 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
418     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
419 fi
420 };
421
422   my ($exited, $stdout, $stderr) = srun_sync(
423     ["srun", "--nodelist=" . join(',', @node)],
424     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
425     {label => "load docker image"});
426   if ($exited != 0)
427   {
428     exit(EX_RETRY_UNLOCKED);
429   }
430
431   # Determine whether this version of Docker supports memory+swap limits.
432   ($exited, $stdout, $stderr) = srun_sync(
433     ["srun", "--nodelist=" . $node[0]],
434     [$docker_bin, 'run', '--help'],
435     {label => "check --memory-swap feature"});
436   $docker_limitmem = ($stdout =~ /--memory-swap/);
437
438   # Find a non-root Docker user to use.
439   # Tries the default user for the container, then 'crunch', then 'nobody',
440   # testing for whether the actual user id is non-zero.  This defends against
441   # mistakes but not malice, but we intend to harden the security in the future
442   # so we don't want anyone getting used to their jobs running as root in their
443   # Docker containers.
444   my @tryusers = ("", "crunch", "nobody");
445   foreach my $try_user (@tryusers) {
446     my $label;
447     my $try_user_arg;
448     if ($try_user eq "") {
449       $label = "check whether default user is UID 0";
450       $try_user_arg = "";
451     } else {
452       $label = "check whether user '$try_user' is UID 0";
453       $try_user_arg = "--user=$try_user";
454     }
455     my ($exited, $stdout, $stderr) = srun_sync(
456       ["srun", "--nodelist=" . $node[0]],
457       ["/bin/sh", "-ec",
458        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
459       {label => $label});
460     chomp($stdout);
461     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
462       $dockeruserarg = $try_user_arg;
463       if ($try_user eq "") {
464         Log(undef, "Container will run with default user");
465       } else {
466         Log(undef, "Container will run with $dockeruserarg");
467       }
468       last;
469     }
470   }
471
472   if (!defined $dockeruserarg) {
473     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
474   }
475
476   if ($Job->{arvados_sdk_version}) {
477     # The job also specifies an Arvados SDK version.  Add the SDKs to the
478     # tar file for the build script to install.
479     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
480                        $Job->{arvados_sdk_version}));
481     add_git_archive("git", "--git-dir=$git_dir", "archive",
482                     "--prefix=.arvados.sdk/",
483                     $Job->{arvados_sdk_version}, "sdk");
484   }
485 }
486
487 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
488   # If script_version looks like an absolute path, *and* the --git-dir
489   # argument was not given -- which implies we were not invoked by
490   # crunch-dispatch -- we will use the given path as a working
491   # directory instead of resolving script_version to a git commit (or
492   # doing anything else with git).
493   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
494   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
495 }
496 else {
497   # Resolve the given script_version to a git commit sha1. Also, if
498   # the repository is remote, clone it into our local filesystem: this
499   # ensures "git archive" will work, and is necessary to reliably
500   # resolve a symbolic script_version like "master^".
501   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
502
503   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
504
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
506
507   # If we're running under crunch-dispatch, it will have already
508   # pulled the appropriate source tree into its own repository, and
509   # given us that repo's path as $git_dir.
510   #
511   # If we're running a "local" job, we might have to fetch content
512   # from a remote repository.
513   #
514   # (Currently crunch-dispatch gives a local path with --git-dir, but
515   # we might as well accept URLs there too in case it changes its
516   # mind.)
517   my $repo = $git_dir || $Job->{'repository'};
518
519   # Repository can be remote or local. If remote, we'll need to fetch it
520   # to a local dir before doing `git log` et al.
521   my $repo_location;
522
523   if ($repo =~ m{://|^[^/]*:}) {
524     # $repo is a git url we can clone, like git:// or https:// or
525     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
526     # not recognized here because distinguishing that from a local
527     # path is too fragile. If you really need something strange here,
528     # use the ssh:// form.
529     $repo_location = 'remote';
530   } elsif ($repo =~ m{^\.*/}) {
531     # $repo is a local path to a git index. We'll also resolve ../foo
532     # to ../foo/.git if the latter is a directory. To help
533     # disambiguate local paths from named hosted repositories, this
534     # form must be given as ./ or ../ if it's a relative path.
535     if (-d "$repo/.git") {
536       $repo = "$repo/.git";
537     }
538     $repo_location = 'local';
539   } else {
540     # $repo is none of the above. It must be the name of a hosted
541     # repository.
542     my $arv_repo_list = api_call("repositories/list",
543                                  'filters' => [['name','=',$repo]]);
544     my @repos_found = @{$arv_repo_list->{'items'}};
545     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
546     if ($n_found > 0) {
547       Log(undef, "Repository '$repo' -> "
548           . join(", ", map { $_->{'uuid'} } @repos_found));
549     }
550     if ($n_found != 1) {
551       croak("Error: Found $n_found repositories with name '$repo'.");
552     }
553     $repo = $repos_found[0]->{'fetch_url'};
554     $repo_location = 'remote';
555   }
556   Log(undef, "Using $repo_location repository '$repo'");
557   $ENV{"CRUNCH_SRC_URL"} = $repo;
558
559   # Resolve given script_version (we'll call that $treeish here) to a
560   # commit sha1 ($commit).
561   my $treeish = $Job->{'script_version'};
562   my $commit;
563   if ($repo_location eq 'remote') {
564     # We minimize excess object-fetching by re-using the same bare
565     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
566     # just keep adding remotes to it as needed.
567     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
568     my $gitcmd = "git --git-dir=\Q$local_repo\E";
569
570     # Set up our local repo for caching remote objects, making
571     # archives, etc.
572     if (!-d $local_repo) {
573       make_path($local_repo) or croak("Error: could not create $local_repo");
574     }
575     # This works (exits 0 and doesn't delete fetched objects) even
576     # if $local_repo is already initialized:
577     `$gitcmd init --bare`;
578     if ($?) {
579       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
580     }
581
582     # If $treeish looks like a hash (or abbrev hash) we look it up in
583     # our local cache first, since that's cheaper. (We don't want to
584     # do that with tags/branches though -- those change over time, so
585     # they should always be resolved by the remote repo.)
586     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
587       # Hide stderr because it's normal for this to fail:
588       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
589       if ($? == 0 &&
590           # Careful not to resolve a branch named abcdeff to commit 1234567:
591           $sha1 =~ /^$treeish/ &&
592           $sha1 =~ /^([0-9a-f]{40})$/s) {
593         $commit = $1;
594         Log(undef, "Commit $commit already present in $local_repo");
595       }
596     }
597
598     if (!defined $commit) {
599       # If $treeish isn't just a hash or abbrev hash, or isn't here
600       # yet, we need to fetch the remote to resolve it correctly.
601
602       # First, remove all local heads. This prevents a name that does
603       # not exist on the remote from resolving to (or colliding with)
604       # a previously fetched branch or tag (possibly from a different
605       # remote).
606       remove_tree("$local_repo/refs/heads", {keep_root => 1});
607
608       Log(undef, "Fetching objects from $repo to $local_repo");
609       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
610       if ($?) {
611         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
612       }
613     }
614
615     # Now that the data is all here, we will use our local repo for
616     # the rest of our git activities.
617     $repo = $local_repo;
618   }
619
620   my $gitcmd = "git --git-dir=\Q$repo\E";
621   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
622   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
623     croak("`$gitcmd rev-list` exited "
624           .exit_status_s($?)
625           .", '$treeish' not found, giving up");
626   }
627   $commit = $1;
628   Log(undef, "Version $treeish is commit $commit");
629
630   if ($commit ne $Job->{'script_version'}) {
631     # Record the real commit id in the database, frozentokey, logs,
632     # etc. -- instead of an abbreviation or a branch name which can
633     # become ambiguous or point to a different commit in the future.
634     if (!$Job->update_attributes('script_version' => $commit)) {
635       croak("Error: failed to update job's script_version attribute");
636     }
637   }
638
639   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
640   add_git_archive("$gitcmd archive ''\Q$commit\E");
641 }
642
643 my $git_archive = combined_git_archive();
644 if (!defined $git_archive) {
645   Log(undef, "Skip install phase (no git archive)");
646   if ($have_slurm) {
647     Log(undef, "Warning: This probably means workers have no source tree!");
648   }
649 }
650 else {
651   my $exited;
652   my $install_script_tries_left = 3;
653   for (my $attempts = 0; $attempts < 3; $attempts++) {
654     my @srunargs = ("srun",
655                     "--nodelist=$nodelist",
656                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
657     my @execargs = ("sh", "-c",
658                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
659
660     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
661     my ($stdout, $stderr);
662     ($exited, $stdout, $stderr) = srun_sync(
663       \@srunargs, \@execargs,
664       {label => "run install script on all workers"},
665       $build_script . $git_archive);
666
667     my $stderr_anything_from_script = 0;
668     for my $line (split(/\n/, $stderr)) {
669       if ($line !~ /^(srun: error: |starting: \[)/) {
670         $stderr_anything_from_script = 1;
671       }
672     }
673
674     last if $exited == 0 || $main::please_freeze;
675
676     # If the install script fails but doesn't print an error message,
677     # the next thing anyone is likely to do is just run it again in
678     # case it was a transient problem like "slurm communication fails
679     # because the network isn't reliable enough". So we'll just do
680     # that ourselves (up to 3 attempts in total). OTOH, if there is an
681     # error message, the problem is more likely to have a real fix and
682     # we should fail the job so the fixing process can start, instead
683     # of doing 2 more attempts.
684     last if $stderr_anything_from_script;
685   }
686
687   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
688     unlink($tar_filename);
689   }
690
691   if ($exited != 0) {
692     croak("Giving up");
693   }
694 }
695
696 foreach (qw (script script_version script_parameters runtime_constraints))
697 {
698   Log (undef,
699        "$_ " .
700        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
701 }
702 foreach (split (/\n/, $Job->{knobs}))
703 {
704   Log (undef, "knob " . $_);
705 }
706
707
708
709 $main::success = undef;
710
711
712
713 ONELEVEL:
714
715 my $thisround_succeeded = 0;
716 my $thisround_failed = 0;
717 my $thisround_failed_multiple = 0;
718 my $working_slot_count = scalar(@slot);
719
720 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
721                        or $a <=> $b } @jobstep_todo;
722 my $level = $jobstep[$jobstep_todo[0]]->{level};
723
724 my $initial_tasks_this_level = 0;
725 foreach my $id (@jobstep_todo) {
726   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
727 }
728
729 # If the number of tasks scheduled at this level #T is smaller than the number
730 # of slots available #S, only use the first #T slots, or the first slot on
731 # each node, whichever number is greater.
732 #
733 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
734 # based on these numbers.  Using fewer slots makes more resources available
735 # to each individual task, which should normally be a better strategy when
736 # there are fewer of them running with less parallelism.
737 #
738 # Note that this calculation is not redone if the initial tasks at
739 # this level queue more tasks at the same level.  This may harm
740 # overall task throughput for that level.
741 my @freeslot;
742 if ($initial_tasks_this_level < @node) {
743   @freeslot = (0..$#node);
744 } elsif ($initial_tasks_this_level < @slot) {
745   @freeslot = (0..$initial_tasks_this_level - 1);
746 } else {
747   @freeslot = (0..$#slot);
748 }
749 my $round_num_freeslots = scalar(@freeslot);
750 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
751
752 my %round_max_slots = ();
753 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
754   my $this_slot = $slot[$freeslot[$ii]];
755   my $node_name = $this_slot->{node}->{name};
756   $round_max_slots{$node_name} ||= $this_slot->{cpu};
757   last if (scalar(keys(%round_max_slots)) >= @node);
758 }
759
760 Log(undef, "start level $level with $round_num_freeslots slots");
761 my @holdslot;
762 my %reader;
763 my $progress_is_dirty = 1;
764 my $progress_stats_updated = 0;
765
766 update_progress_stats();
767
768
769 THISROUND:
770 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
771 {
772   # Don't create new tasks if we already know the job's final result.
773   last if defined($main::success);
774
775   my $id = $jobstep_todo[$todo_ptr];
776   my $Jobstep = $jobstep[$id];
777   if ($Jobstep->{level} != $level)
778   {
779     next;
780   }
781
782   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
783   set_nonblocking($reader{$id});
784
785   my $childslot = $freeslot[0];
786   my $childnode = $slot[$childslot]->{node};
787   my $childslotname = join (".",
788                             $slot[$childslot]->{node}->{name},
789                             $slot[$childslot]->{cpu});
790
791   my $childpid = fork();
792   if ($childpid == 0)
793   {
794     $SIG{'INT'} = 'DEFAULT';
795     $SIG{'QUIT'} = 'DEFAULT';
796     $SIG{'TERM'} = 'DEFAULT';
797
798     foreach (values (%reader))
799     {
800       close($_);
801     }
802     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
803     open(STDOUT,">&writer");
804     open(STDERR,">&writer");
805
806     undef $dbh;
807     undef $sth;
808
809     delete $ENV{"GNUPGHOME"};
810     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
811     $ENV{"TASK_QSEQUENCE"} = $id;
812     $ENV{"TASK_SEQUENCE"} = $level;
813     $ENV{"JOB_SCRIPT"} = $Job->{script};
814     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
815       $param =~ tr/a-z/A-Z/;
816       $ENV{"JOB_PARAMETER_$param"} = $value;
817     }
818     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
819     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
820     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
821     $ENV{"HOME"} = $ENV{"TASK_WORK"};
822     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
823     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
824     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
825
826     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
827
828     $ENV{"GZIP"} = "-n";
829
830     my @srunargs = (
831       "srun",
832       "--nodelist=".$childnode->{name},
833       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
834       "--job-name=$job_id.$id.$$",
835         );
836
837     my $stdbuf = " stdbuf --output=0 --error=0 ";
838
839     my $arv_file_cache = "";
840     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
841       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
842     }
843
844     my $command =
845         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
846         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
847         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
848         # These environment variables get used explicitly later in
849         # $command.  No tool is expected to read these values directly.
850         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
851         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
852         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
853         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
854
855     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
856     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
857     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
858
859     if ($docker_hash)
860     {
861       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
862       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
863       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
864       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
865       # We only set memory limits if Docker lets us limit both memory and swap.
866       # Memory limits alone have been supported longer, but subprocesses tend
867       # to get SIGKILL if they exceed that without any swap limit set.
868       # See #5642 for additional background.
869       if ($docker_limitmem) {
870         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
871       }
872
873       # The source tree and $destdir directory (which we have
874       # installed on the worker host) are available in the container,
875       # under the same path.
876       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
877       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
878
879       # Currently, we make the "by_pdh" directory in arv-mount's mount
880       # point appear at /keep inside the container (instead of using
881       # the same path as the host like we do with CRUNCH_SRC and
882       # CRUNCH_INSTALL). However, crunch scripts and utilities must
883       # not rely on this. They must use $TASK_KEEPMOUNT.
884       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
885       $ENV{TASK_KEEPMOUNT} = "/keep";
886
887       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
888       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
889       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
890
891       # TASK_WORK is almost exactly like a docker data volume: it
892       # starts out empty, is writable, and persists until no
893       # containers use it any more. We don't use --volumes-from to
894       # share it with other containers: it is only accessible to this
895       # task, and it goes away when this task stops.
896       #
897       # However, a docker data volume is writable only by root unless
898       # the mount point already happens to exist in the container with
899       # different permissions. Therefore, we [1] assume /tmp already
900       # exists in the image and is writable by the crunch user; [2]
901       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
902       # writable if they are created by docker while setting up the
903       # other --volumes); and [3] create $TASK_WORK inside the
904       # container using $build_script.
905       $command .= "--volume=/tmp ";
906       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
907       $ENV{"HOME"} = $ENV{"TASK_WORK"};
908       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
909
910       # TODO: Share a single JOB_WORK volume across all task
911       # containers on a given worker node, and delete it when the job
912       # ends (and, in case that doesn't work, when the next job
913       # starts).
914       #
915       # For now, use the same approach as TASK_WORK above.
916       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
917
918       while (my ($env_key, $env_val) = each %ENV)
919       {
920         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
921           $command .= "--env=\Q$env_key=$env_val\E ";
922         }
923       }
924       $command .= "--env=\QHOME=$ENV{HOME}\E ";
925       $command .= "\Q$docker_hash\E ";
926
927       if ($Job->{arvados_sdk_version}) {
928         $command .= $stdbuf;
929         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
930       } else {
931         $command .= "/bin/sh -c \'python -c " .
932             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
933             ">&2 2>/dev/null; " .
934             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
935             "if which stdbuf >/dev/null ; then " .
936             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
937             " else " .
938             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
939             " fi\'";
940       }
941     } else {
942       # Non-docker run
943       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
944       $command .= $stdbuf;
945       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
946     }
947
948     my @execargs = ('bash', '-c', $command);
949     srun (\@srunargs, \@execargs, undef, $build_script);
950     # exec() failed, we assume nothing happened.
951     die "srun() failed on build script\n";
952   }
953   close("writer");
954   if (!defined $childpid)
955   {
956     close $reader{$id};
957     delete $reader{$id};
958     next;
959   }
960   shift @freeslot;
961   $proc{$childpid} = {
962     jobstepidx => $id,
963     time => time,
964     slot => $childslot,
965     jobstepname => "$job_id.$id.$childpid",
966   };
967   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
968   $slot[$childslot]->{pid} = $childpid;
969
970   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
971   Log ($id, "child $childpid started on $childslotname");
972   $Jobstep->{starttime} = time;
973   $Jobstep->{node} = $childnode->{name};
974   $Jobstep->{slotindex} = $childslot;
975   delete $Jobstep->{stderr};
976   delete $Jobstep->{finishtime};
977   delete $Jobstep->{tempfail};
978
979   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
980   $Jobstep->{'arvados_task'}->save;
981
982   splice @jobstep_todo, $todo_ptr, 1;
983   --$todo_ptr;
984
985   $progress_is_dirty = 1;
986
987   while (!@freeslot
988          ||
989          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
990   {
991     last THISROUND if $main::please_freeze;
992     if ($main::please_info)
993     {
994       $main::please_info = 0;
995       freeze();
996       create_output_collection();
997       save_meta(1);
998       update_progress_stats();
999     }
1000     my $gotsome
1001         = readfrompipes ()
1002         + reapchildren ();
1003     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1004     {
1005       check_refresh_wanted();
1006       check_squeue();
1007       update_progress_stats();
1008     }
1009     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1010     {
1011       update_progress_stats();
1012     }
1013     if (!$gotsome) {
1014       select (undef, undef, undef, 0.1);
1015     }
1016     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1017                                         $_->{node}->{hold_count} < 4 } @slot);
1018     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1019         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1020     {
1021       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1022           .($thisround_failed+$thisround_succeeded)
1023           .") -- giving up on this round";
1024       Log (undef, $message);
1025       last THISROUND;
1026     }
1027
1028     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1029     for (my $i=$#freeslot; $i>=0; $i--) {
1030       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1031         push @holdslot, (splice @freeslot, $i, 1);
1032       }
1033     }
1034     for (my $i=$#holdslot; $i>=0; $i--) {
1035       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1036         push @freeslot, (splice @holdslot, $i, 1);
1037       }
1038     }
1039
1040     # give up if no nodes are succeeding
1041     if ($working_slot_count < 1) {
1042       Log(undef, "Every node has failed -- giving up");
1043       last THISROUND;
1044     }
1045   }
1046 }
1047
1048
1049 push @freeslot, splice @holdslot;
1050 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1051
1052
1053 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1054 while (%proc)
1055 {
1056   if ($main::please_continue) {
1057     $main::please_continue = 0;
1058     goto THISROUND;
1059   }
1060   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1061   readfrompipes ();
1062   if (!reapchildren())
1063   {
1064     check_refresh_wanted();
1065     check_squeue();
1066     update_progress_stats();
1067     select (undef, undef, undef, 0.1);
1068     killem (keys %proc) if $main::please_freeze;
1069   }
1070 }
1071
1072 update_progress_stats();
1073 freeze_if_want_freeze();
1074
1075
1076 if (!defined $main::success)
1077 {
1078   if (!@jobstep_todo) {
1079     $main::success = 1;
1080   } elsif ($working_slot_count < 1) {
1081     save_output_collection();
1082     save_meta();
1083     exit(EX_RETRY_UNLOCKED);
1084   } elsif ($thisround_succeeded == 0 &&
1085            ($thisround_failed == 0 || $thisround_failed > 4)) {
1086     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1087     Log (undef, $message);
1088     $main::success = 0;
1089   }
1090 }
1091
1092 goto ONELEVEL if !defined $main::success;
1093
1094
1095 release_allocation();
1096 freeze();
1097 my $collated_output = save_output_collection();
1098 Log (undef, "finish");
1099
1100 save_meta();
1101
1102 my $final_state;
1103 if ($collated_output && $main::success) {
1104   $final_state = 'Complete';
1105 } else {
1106   $final_state = 'Failed';
1107 }
1108 $Job->update_attributes('state' => $final_state);
1109
1110 exit (($final_state eq 'Complete') ? 0 : 1);
1111
1112
1113
1114 sub update_progress_stats
1115 {
1116   $progress_stats_updated = time;
1117   return if !$progress_is_dirty;
1118   my ($todo, $done, $running) = (scalar @jobstep_todo,
1119                                  scalar @jobstep_done,
1120                                  scalar keys(%proc));
1121   $Job->{'tasks_summary'} ||= {};
1122   $Job->{'tasks_summary'}->{'todo'} = $todo;
1123   $Job->{'tasks_summary'}->{'done'} = $done;
1124   $Job->{'tasks_summary'}->{'running'} = $running;
1125   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1126   Log (undef, "status: $done done, $running running, $todo todo");
1127   $progress_is_dirty = 0;
1128 }
1129
1130
1131
1132 sub reapchildren
1133 {
1134   my $children_reaped = 0;
1135   while ((my $pid = waitpid (-1, WNOHANG)) > 0)
1136   {
1137     my $childstatus = $?;
1138
1139     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1140                     . "."
1141                     . $slot[$proc{$pid}->{slot}]->{cpu});
1142     my $jobstepidx = $proc{$pid}->{jobstepidx};
1143
1144     if (!WIFEXITED($childstatus))
1145     {
1146       # child did not exit (may be temporarily stopped)
1147       Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
1148       next;
1149     }
1150
1151     $children_reaped++;
1152     my $elapsed = time - $proc{$pid}->{time};
1153     my $Jobstep = $jobstep[$jobstepidx];
1154
1155     my $exitvalue = $childstatus >> 8;
1156     my $exitinfo = "exit ".exit_status_s($childstatus);
1157     $Jobstep->{'arvados_task'}->reload;
1158     my $task_success = $Jobstep->{'arvados_task'}->{success};
1159
1160     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1161
1162     if (!defined $task_success) {
1163       # task did not indicate one way or the other --> fail
1164       Log($jobstepidx, sprintf(
1165             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1166             exit_status_s($childstatus)));
1167       $Jobstep->{'arvados_task'}->{success} = 0;
1168       $Jobstep->{'arvados_task'}->save;
1169       $task_success = 0;
1170     }
1171
1172     if (!$task_success)
1173     {
1174       my $temporary_fail;
1175       $temporary_fail ||= $Jobstep->{tempfail};
1176       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1177
1178       ++$thisround_failed;
1179       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1180
1181       # Check for signs of a failed or misconfigured node
1182       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1183           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1184         # Don't count this against jobstep failure thresholds if this
1185         # node is already suspected faulty and srun exited quickly
1186         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1187             $elapsed < 5) {
1188           Log ($jobstepidx, "blaming failure on suspect node " .
1189                $slot[$proc{$pid}->{slot}]->{node}->{name});
1190           $temporary_fail ||= 1;
1191         }
1192         ban_node_by_slot($proc{$pid}->{slot});
1193       }
1194
1195       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1196                                 ++$Jobstep->{'failures'},
1197                                 $temporary_fail ? 'temporary' : 'permanent',
1198                                 $elapsed));
1199
1200       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1201         # Give up on this task, and the whole job
1202         $main::success = 0;
1203       }
1204       # Put this task back on the todo queue
1205       push @jobstep_todo, $jobstepidx;
1206       $Job->{'tasks_summary'}->{'failed'}++;
1207     }
1208     else
1209     {
1210       ++$thisround_succeeded;
1211       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1212       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1213       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1214       push @jobstep_done, $jobstepidx;
1215       Log ($jobstepidx, "success in $elapsed seconds");
1216     }
1217     $Jobstep->{exitcode} = $childstatus;
1218     $Jobstep->{finishtime} = time;
1219     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1220     $Jobstep->{'arvados_task'}->save;
1221     process_stderr_final ($jobstepidx);
1222     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1223                               length($Jobstep->{'arvados_task'}->{output}),
1224                               $Jobstep->{'arvados_task'}->{output}));
1225
1226     close $reader{$jobstepidx};
1227     delete $reader{$jobstepidx};
1228     delete $slot[$proc{$pid}->{slot}]->{pid};
1229     push @freeslot, $proc{$pid}->{slot};
1230     delete $proc{$pid};
1231
1232     if ($task_success) {
1233       # Load new tasks
1234       my $newtask_list = [];
1235       my $newtask_results;
1236       do {
1237         $newtask_results = api_call(
1238           "job_tasks/list",
1239           'where' => {
1240             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1241           },
1242           'order' => 'qsequence',
1243           'offset' => scalar(@$newtask_list),
1244             );
1245         push(@$newtask_list, @{$newtask_results->{items}});
1246       } while (@{$newtask_results->{items}});
1247       foreach my $arvados_task (@$newtask_list) {
1248         my $jobstep = {
1249           'level' => $arvados_task->{'sequence'},
1250           'failures' => 0,
1251           'arvados_task' => $arvados_task
1252         };
1253         push @jobstep, $jobstep;
1254         push @jobstep_todo, $#jobstep;
1255       }
1256     }
1257     $progress_is_dirty = 1;
1258   }
1259
1260   return $children_reaped;
1261 }
1262
1263 sub check_refresh_wanted
1264 {
1265   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1266   if (@stat &&
1267       $stat[9] > $latest_refresh &&
1268       # ...and we have actually locked the job record...
1269       $job_id eq $Job->{'uuid'}) {
1270     $latest_refresh = scalar time;
1271     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1272     for my $attr ('cancelled_at',
1273                   'cancelled_by_user_uuid',
1274                   'cancelled_by_client_uuid',
1275                   'state') {
1276       $Job->{$attr} = $Job2->{$attr};
1277     }
1278     if ($Job->{'state'} ne "Running") {
1279       if ($Job->{'state'} eq "Cancelled") {
1280         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1281       } else {
1282         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1283       }
1284       $main::success = 0;
1285       $main::please_freeze = 1;
1286     }
1287   }
1288 }
1289
1290 sub check_squeue
1291 {
1292   my $last_squeue_check = $squeue_checked;
1293
1294   # Do not call `squeue` or check the kill list more than once every
1295   # 15 seconds.
1296   return if $last_squeue_check > time - 15;
1297   $squeue_checked = time;
1298
1299   # Look for children from which we haven't received stderr data since
1300   # the last squeue check. If no such children exist, all procs are
1301   # alive and there's no need to even look at squeue.
1302   #
1303   # As long as the crunchstat poll interval (10s) is shorter than the
1304   # squeue check interval (15s) this should make the squeue check an
1305   # infrequent event.
1306   my $silent_procs = 0;
1307   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1308   {
1309     if (!exists($js->{stderr_at}))
1310     {
1311       $js->{stderr_at} = 0;
1312     }
1313     if ($js->{stderr_at} < $last_squeue_check)
1314     {
1315       $silent_procs++;
1316     }
1317   }
1318   return if $silent_procs == 0;
1319
1320   # use killem() on procs whose killtime is reached
1321   while (my ($pid, $procinfo) = each %proc)
1322   {
1323     my $js = $jobstep[$procinfo->{jobstepidx}];
1324     if (exists $procinfo->{killtime}
1325         && $procinfo->{killtime} <= time
1326         && $js->{stderr_at} < $last_squeue_check)
1327     {
1328       my $sincewhen = "";
1329       if ($js->{stderr_at}) {
1330         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1331       }
1332       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1333       killem ($pid);
1334     }
1335   }
1336
1337   if (!$have_slurm)
1338   {
1339     # here is an opportunity to check for mysterious problems with local procs
1340     return;
1341   }
1342
1343   # Get a list of steps still running.  Note: squeue(1) says --steps
1344   # selects a format (which we override anyway) and allows us to
1345   # specify which steps we're interested in (which we don't).
1346   # Importantly, it also changes the meaning of %j from "job name" to
1347   # "step name" and (although this isn't mentioned explicitly in the
1348   # docs) switches from "one line per job" mode to "one line per step"
1349   # mode. Without it, we'd just get a list of one job, instead of a
1350   # list of N steps.
1351   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1352   if ($? != 0)
1353   {
1354     Log(undef, "warning: squeue exit status $? ($!)");
1355     return;
1356   }
1357   chop @squeue;
1358
1359   # which of my jobsteps are running, according to squeue?
1360   my %ok;
1361   for my $jobstepname (@squeue)
1362   {
1363     $ok{$jobstepname} = 1;
1364   }
1365
1366   # Check for child procs >60s old and not mentioned by squeue.
1367   while (my ($pid, $procinfo) = each %proc)
1368   {
1369     if ($procinfo->{time} < time - 60
1370         && $procinfo->{jobstepname}
1371         && !exists $ok{$procinfo->{jobstepname}}
1372         && !exists $procinfo->{killtime})
1373     {
1374       # According to slurm, this task has ended (successfully or not)
1375       # -- but our srun child hasn't exited. First we must wait (30
1376       # seconds) in case this is just a race between communication
1377       # channels. Then, if our srun child process still hasn't
1378       # terminated, we'll conclude some slurm communication
1379       # error/delay has caused the task to die without notifying srun,
1380       # and we'll kill srun ourselves.
1381       $procinfo->{killtime} = time + 30;
1382       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1383     }
1384   }
1385 }
1386
1387
1388 sub release_allocation
1389 {
1390   if ($have_slurm)
1391   {
1392     Log (undef, "release job allocation");
1393     system "scancel $ENV{SLURM_JOB_ID}";
1394   }
1395 }
1396
1397
1398 sub readfrompipes
1399 {
1400   my $gotsome = 0;
1401   my %fd_job;
1402   my $sel = IO::Select->new();
1403   foreach my $jobstepidx (keys %reader)
1404   {
1405     my $fd = $reader{$jobstepidx};
1406     $sel->add($fd);
1407     $fd_job{$fd} = $jobstepidx;
1408
1409     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1410       $sel->add($stdout_fd);
1411       $fd_job{$stdout_fd} = $jobstepidx;
1412     }
1413   }
1414   # select on all reader fds with 0.1s timeout
1415   my @ready_fds = $sel->can_read(0.1);
1416   foreach my $fd (@ready_fds)
1417   {
1418     my $buf;
1419     if (0 < sysread ($fd, $buf, 65536))
1420     {
1421       $gotsome = 1;
1422       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1423
1424       my $jobstepidx = $fd_job{$fd};
1425       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1426         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1427         next;
1428       }
1429
1430       $jobstep[$jobstepidx]->{stderr_at} = time;
1431       $jobstep[$jobstepidx]->{stderr} .= $buf;
1432
1433       # Consume everything up to the last \n
1434       preprocess_stderr ($jobstepidx);
1435
1436       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1437       {
1438         # If we get a lot of stderr without a newline, chop off the
1439         # front to avoid letting our buffer grow indefinitely.
1440         substr ($jobstep[$jobstepidx]->{stderr},
1441                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1442       }
1443     }
1444   }
1445   return $gotsome;
1446 }
1447
1448
1449 # Consume all full lines of stderr for a jobstep. Everything after the
1450 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1451 # returning.
1452 sub preprocess_stderr
1453 {
1454   my $jobstepidx = shift;
1455
1456   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1457     my $line = $1;
1458     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1459     Log ($jobstepidx, "stderr $line");
1460     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1461       # whoa.
1462       $main::please_freeze = 1;
1463     }
1464     elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1465       # Skip the following tempfail checks if this srun proc isn't
1466       # attached to a particular worker slot.
1467     }
1468     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1469       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1470       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1471       $slot[$job_slot_index]->{node}->{fail_count}++;
1472       $jobstep[$jobstepidx]->{tempfail} = 1;
1473       ban_node_by_slot($job_slot_index);
1474     }
1475     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1476       $jobstep[$jobstepidx]->{tempfail} = 1;
1477       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1478     }
1479     elsif ($line =~ /arvados\.errors\.Keep/) {
1480       $jobstep[$jobstepidx]->{tempfail} = 1;
1481     }
1482   }
1483 }
1484
1485
1486 sub process_stderr_final
1487 {
1488   my $jobstepidx = shift;
1489   preprocess_stderr ($jobstepidx);
1490
1491   map {
1492     Log ($jobstepidx, "stderr $_");
1493   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1494   $jobstep[$jobstepidx]->{stderr} = '';
1495 }
1496
1497 sub fetch_block
1498 {
1499   my $hash = shift;
1500   my $keep;
1501   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1502     Log(undef, "fetch_block run error from arv-get $hash: $!");
1503     return undef;
1504   }
1505   my $output_block = "";
1506   while (1) {
1507     my $buf;
1508     my $bytes = sysread($keep, $buf, 1024 * 1024);
1509     if (!defined $bytes) {
1510       Log(undef, "fetch_block read error from arv-get: $!");
1511       $output_block = undef;
1512       last;
1513     } elsif ($bytes == 0) {
1514       # sysread returns 0 at the end of the pipe.
1515       last;
1516     } else {
1517       # some bytes were read into buf.
1518       $output_block .= $buf;
1519     }
1520   }
1521   close $keep;
1522   if ($?) {
1523     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1524     $output_block = undef;
1525   }
1526   return $output_block;
1527 }
1528
1529 # Create a collection by concatenating the output of all tasks (each
1530 # task's output is either a manifest fragment, a locator for a
1531 # manifest fragment stored in Keep, or nothing at all). Return the
1532 # portable_data_hash of the new collection.
1533 sub create_output_collection
1534 {
1535   Log (undef, "collate");
1536
1537   my ($child_out, $child_in);
1538   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1539 import arvados
1540 import sys
1541 print (arvados.api("v1").collections().
1542        create(body={"manifest_text": sys.stdin.read()}).
1543        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1544 }, retry_count());
1545
1546   my $task_idx = -1;
1547   my $manifest_size = 0;
1548   for (@jobstep)
1549   {
1550     ++$task_idx;
1551     my $output = $_->{'arvados_task'}->{output};
1552     next if (!defined($output));
1553     my $next_write;
1554     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1555       $next_write = fetch_block($output);
1556     } else {
1557       $next_write = $output;
1558     }
1559     if (defined($next_write)) {
1560       if (!defined(syswrite($child_in, $next_write))) {
1561         # There's been an error writing.  Stop the loop.
1562         # We'll log details about the exit code later.
1563         last;
1564       } else {
1565         $manifest_size += length($next_write);
1566       }
1567     } else {
1568       my $uuid = $_->{'arvados_task'}->{'uuid'};
1569       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1570       $main::success = 0;
1571     }
1572   }
1573   close($child_in);
1574   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1575
1576   my $joboutput;
1577   my $s = IO::Select->new($child_out);
1578   if ($s->can_read(120)) {
1579     sysread($child_out, $joboutput, 1024 * 1024);
1580     waitpid($pid, 0);
1581     if ($?) {
1582       Log(undef, "output collection creation exited " . exit_status_s($?));
1583       $joboutput = undef;
1584     } else {
1585       chomp($joboutput);
1586     }
1587   } else {
1588     Log (undef, "timed out while creating output collection");
1589     foreach my $signal (2, 2, 2, 15, 15, 9) {
1590       kill($signal, $pid);
1591       last if waitpid($pid, WNOHANG) == -1;
1592       sleep(1);
1593     }
1594   }
1595   close($child_out);
1596
1597   return $joboutput;
1598 }
1599
1600 # Calls create_output_collection, logs the result, and returns it.
1601 # If that was successful, save that as the output in the job record.
1602 sub save_output_collection {
1603   my $collated_output = create_output_collection();
1604
1605   if (!$collated_output) {
1606     Log(undef, "Failed to write output collection");
1607   }
1608   else {
1609     Log(undef, "job output $collated_output");
1610     $Job->update_attributes('output' => $collated_output);
1611   }
1612   return $collated_output;
1613 }
1614
1615 sub killem
1616 {
1617   foreach (@_)
1618   {
1619     my $sig = 2;                # SIGINT first
1620     if (exists $proc{$_}->{"sent_$sig"} &&
1621         time - $proc{$_}->{"sent_$sig"} > 4)
1622     {
1623       $sig = 15;                # SIGTERM if SIGINT doesn't work
1624     }
1625     if (exists $proc{$_}->{"sent_$sig"} &&
1626         time - $proc{$_}->{"sent_$sig"} > 4)
1627     {
1628       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1629     }
1630     if (!exists $proc{$_}->{"sent_$sig"})
1631     {
1632       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1633       kill $sig, $_;
1634       select (undef, undef, undef, 0.1);
1635       if ($sig == 2)
1636       {
1637         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1638       }
1639       $proc{$_}->{"sent_$sig"} = time;
1640       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1641     }
1642   }
1643 }
1644
1645
1646 sub fhbits
1647 {
1648   my($bits);
1649   for (@_) {
1650     vec($bits,fileno($_),1) = 1;
1651   }
1652   $bits;
1653 }
1654
1655
1656 # Send log output to Keep via arv-put.
1657 #
1658 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1659 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1660 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1661 # $log_pipe_pid is the pid of the arv-put subprocess.
1662 #
1663 # The only functions that should access these variables directly are:
1664 #
1665 # log_writer_start($logfilename)
1666 #     Starts an arv-put pipe, reading data on stdin and writing it to
1667 #     a $logfilename file in an output collection.
1668 #
1669 # log_writer_read_output([$timeout])
1670 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1671 #     Passes $timeout to the select() call, with a default of 0.01.
1672 #     Returns the result of the last read() call on $log_pipe_out, or
1673 #     -1 if read() wasn't called because select() timed out.
1674 #     Only other log_writer_* functions should need to call this.
1675 #
1676 # log_writer_send($txt)
1677 #     Writes $txt to the output log collection.
1678 #
1679 # log_writer_finish()
1680 #     Closes the arv-put pipe and returns the output that it produces.
1681 #
1682 # log_writer_is_active()
1683 #     Returns a true value if there is currently a live arv-put
1684 #     process, false otherwise.
1685 #
1686 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1687     $log_pipe_pid);
1688
1689 sub log_writer_start($)
1690 {
1691   my $logfilename = shift;
1692   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1693                         'arv-put',
1694                         '--stream',
1695                         '--retries', '3',
1696                         '--filename', $logfilename,
1697                         '-');
1698   $log_pipe_out_buf = "";
1699   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1700 }
1701
1702 sub log_writer_read_output {
1703   my $timeout = shift || 0.01;
1704   my $read = -1;
1705   while ($read && $log_pipe_out_select->can_read($timeout)) {
1706     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1707                  length($log_pipe_out_buf));
1708   }
1709   if (!defined($read)) {
1710     Log(undef, "error reading log manifest from arv-put: $!");
1711   }
1712   return $read;
1713 }
1714
1715 sub log_writer_send($)
1716 {
1717   my $txt = shift;
1718   print $log_pipe_in $txt;
1719   log_writer_read_output();
1720 }
1721
1722 sub log_writer_finish()
1723 {
1724   return unless $log_pipe_pid;
1725
1726   close($log_pipe_in);
1727
1728   my $logger_failed = 0;
1729   my $read_result = log_writer_read_output(120);
1730   if ($read_result == -1) {
1731     $logger_failed = -1;
1732     Log (undef, "timed out reading from 'arv-put'");
1733   } elsif ($read_result != 0) {
1734     $logger_failed = -2;
1735     Log(undef, "failed to read arv-put log manifest to EOF");
1736   }
1737
1738   waitpid($log_pipe_pid, 0);
1739   if ($?) {
1740     $logger_failed ||= $?;
1741     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1742   }
1743
1744   close($log_pipe_out);
1745   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1746   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1747       $log_pipe_out_select = undef;
1748
1749   return $arv_put_output;
1750 }
1751
1752 sub log_writer_is_active() {
1753   return $log_pipe_pid;
1754 }
1755
1756 sub Log                         # ($jobstepidx, $logmessage)
1757 {
1758   my ($jobstepidx, $logmessage) = @_;
1759   if ($logmessage =~ /\n/) {
1760     for my $line (split (/\n/, $_[1])) {
1761       Log ($jobstepidx, $line);
1762     }
1763     return;
1764   }
1765   my $fh = select STDERR; $|=1; select $fh;
1766   my $task_qseq = '';
1767   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1768     $task_qseq = $jobstepidx;
1769   }
1770   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1771   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1772   $message .= "\n";
1773   my $datetime;
1774   if (log_writer_is_active() || -t STDERR) {
1775     my @gmtime = gmtime;
1776     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1777                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1778   }
1779   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1780
1781   if (log_writer_is_active()) {
1782     log_writer_send($datetime . " " . $message);
1783   }
1784 }
1785
1786
1787 sub croak
1788 {
1789   my ($package, $file, $line) = caller;
1790   my $message = "@_ at $file line $line\n";
1791   Log (undef, $message);
1792   freeze() if @jobstep_todo;
1793   create_output_collection() if @jobstep_todo;
1794   cleanup();
1795   save_meta();
1796   die;
1797 }
1798
1799
1800 sub cleanup
1801 {
1802   return unless $Job;
1803   if ($Job->{'state'} eq 'Cancelled') {
1804     $Job->update_attributes('finished_at' => scalar gmtime);
1805   } else {
1806     $Job->update_attributes('state' => 'Failed');
1807   }
1808 }
1809
1810
1811 sub save_meta
1812 {
1813   my $justcheckpoint = shift; # false if this will be the last meta saved
1814   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1815   return unless log_writer_is_active();
1816   my $log_manifest = log_writer_finish();
1817   return unless defined($log_manifest);
1818
1819   if ($Job->{log}) {
1820     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1821     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1822   }
1823
1824   my $log_coll = api_call(
1825     "collections/create", ensure_unique_name => 1, collection => {
1826       manifest_text => $log_manifest,
1827       owner_uuid => $Job->{owner_uuid},
1828       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1829     });
1830   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1831   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1832 }
1833
1834
1835 sub freeze_if_want_freeze
1836 {
1837   if ($main::please_freeze)
1838   {
1839     release_allocation();
1840     if (@_)
1841     {
1842       # kill some srun procs before freeze+stop
1843       map { $proc{$_} = {} } @_;
1844       while (%proc)
1845       {
1846         killem (keys %proc);
1847         select (undef, undef, undef, 0.1);
1848         my $died;
1849         while (($died = waitpid (-1, WNOHANG)) > 0)
1850         {
1851           delete $proc{$died};
1852         }
1853       }
1854     }
1855     freeze();
1856     create_output_collection();
1857     cleanup();
1858     save_meta();
1859     exit 1;
1860   }
1861 }
1862
1863
1864 sub freeze
1865 {
1866   Log (undef, "Freeze not implemented");
1867   return;
1868 }
1869
1870
1871 sub thaw
1872 {
1873   croak ("Thaw not implemented");
1874 }
1875
1876
1877 sub freezequote
1878 {
1879   my $s = shift;
1880   $s =~ s/\\/\\\\/g;
1881   $s =~ s/\n/\\n/g;
1882   return $s;
1883 }
1884
1885
1886 sub freezeunquote
1887 {
1888   my $s = shift;
1889   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1890   return $s;
1891 }
1892
1893
1894 sub srun_sync
1895 {
1896   my $srunargs = shift;
1897   my $execargs = shift;
1898   my $opts = shift || {};
1899   my $stdin = shift;
1900
1901   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1902   Log (undef, "$label: start");
1903
1904   my ($stderr_r, $stderr_w);
1905   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1906
1907   my ($stdout_r, $stdout_w);
1908   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1909
1910   my $srunpid = fork();
1911   if ($srunpid == 0)
1912   {
1913     close($stderr_r);
1914     close($stdout_r);
1915     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1916     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1917     open(STDERR, ">&", $stderr_w);
1918     open(STDOUT, ">&", $stdout_w);
1919     srun ($srunargs, $execargs, $opts, $stdin);
1920     exit (1);
1921   }
1922   close($stderr_w);
1923   close($stdout_w);
1924
1925   set_nonblocking($stderr_r);
1926   set_nonblocking($stdout_r);
1927
1928   # Add entries to @jobstep and %proc so check_squeue() and
1929   # freeze_if_want_freeze() can treat it like a job task process.
1930   push @jobstep, {
1931     stderr => '',
1932     stderr_at => 0,
1933     stderr_captured => '',
1934     stdout_r => $stdout_r,
1935     stdout_captured => '',
1936   };
1937   my $jobstepidx = $#jobstep;
1938   $proc{$srunpid} = {
1939     jobstepidx => $jobstepidx,
1940   };
1941   $reader{$jobstepidx} = $stderr_r;
1942
1943   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1944     my $busy = readfrompipes();
1945     if (!$busy || ($latest_refresh + 2 < scalar time)) {
1946       check_refresh_wanted();
1947       check_squeue();
1948     }
1949     if (!$busy) {
1950       select(undef, undef, undef, 0.1);
1951     }
1952     killem(keys %proc) if $main::please_freeze;
1953   }
1954   my $exited = $?;
1955
1956   1 while readfrompipes();
1957   process_stderr_final ($jobstepidx);
1958
1959   Log (undef, "$label: exit ".exit_status_s($exited));
1960
1961   close($stdout_r);
1962   close($stderr_r);
1963   delete $proc{$srunpid};
1964   delete $reader{$jobstepidx};
1965
1966   my $j = pop @jobstep;
1967   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1968 }
1969
1970
1971 sub srun
1972 {
1973   my $srunargs = shift;
1974   my $execargs = shift;
1975   my $opts = shift || {};
1976   my $stdin = shift;
1977   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1978
1979   $Data::Dumper::Terse = 1;
1980   $Data::Dumper::Indent = 0;
1981   my $show_cmd = Dumper($args);
1982   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1983   $show_cmd =~ s/\n/ /g;
1984   if ($opts->{fork}) {
1985     Log(undef, "starting: $show_cmd");
1986   } else {
1987     # This is a child process: parent is in charge of reading our
1988     # stderr and copying it to Log() if needed.
1989     warn "starting: $show_cmd\n";
1990   }
1991
1992   if (defined $stdin) {
1993     my $child = open STDIN, "-|";
1994     defined $child or die "no fork: $!";
1995     if ($child == 0) {
1996       print $stdin or die $!;
1997       close STDOUT or die $!;
1998       exit 0;
1999     }
2000   }
2001
2002   return system (@$args) if $opts->{fork};
2003
2004   exec @$args;
2005   warn "ENV size is ".length(join(" ",%ENV));
2006   die "exec failed: $!: @$args";
2007 }
2008
2009
2010 sub ban_node_by_slot {
2011   # Don't start any new jobsteps on this node for 60 seconds
2012   my $slotid = shift;
2013   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2014   $slot[$slotid]->{node}->{hold_count}++;
2015   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2016 }
2017
2018 sub must_lock_now
2019 {
2020   my ($lockfile, $error_message) = @_;
2021   open L, ">", $lockfile or croak("$lockfile: $!");
2022   if (!flock L, LOCK_EX|LOCK_NB) {
2023     croak("Can't lock $lockfile: $error_message\n");
2024   }
2025 }
2026
2027 sub find_docker_image {
2028   # Given a Keep locator, check to see if it contains a Docker image.
2029   # If so, return its stream name and Docker hash.
2030   # If not, return undef for both values.
2031   my $locator = shift;
2032   my ($streamname, $filename);
2033   my $image = api_call("collections/get", uuid => $locator);
2034   if ($image) {
2035     foreach my $line (split(/\n/, $image->{manifest_text})) {
2036       my @tokens = split(/\s+/, $line);
2037       next if (!@tokens);
2038       $streamname = shift(@tokens);
2039       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2040         if (defined($filename)) {
2041           return (undef, undef);  # More than one file in the Collection.
2042         } else {
2043           $filename = (split(/:/, $filedata, 3))[2];
2044         }
2045       }
2046     }
2047   }
2048   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2049     return ($streamname, $1);
2050   } else {
2051     return (undef, undef);
2052   }
2053 }
2054
2055 sub retry_count {
2056   # Calculate the number of times an operation should be retried,
2057   # assuming exponential backoff, and that we're willing to retry as
2058   # long as tasks have been running.  Enforce a minimum of 3 retries.
2059   my ($starttime, $endtime, $timediff, $retries);
2060   if (@jobstep) {
2061     $starttime = $jobstep[0]->{starttime};
2062     $endtime = $jobstep[-1]->{finishtime};
2063   }
2064   if (!defined($starttime)) {
2065     $timediff = 0;
2066   } elsif (!defined($endtime)) {
2067     $timediff = time - $starttime;
2068   } else {
2069     $timediff = ($endtime - $starttime) - (time - $endtime);
2070   }
2071   if ($timediff > 0) {
2072     $retries = int(log($timediff) / log(2));
2073   } else {
2074     $retries = 1;  # Use the minimum.
2075   }
2076   return ($retries > 3) ? $retries : 3;
2077 }
2078
2079 sub retry_op {
2080   # Pass in two function references.
2081   # This method will be called with the remaining arguments.
2082   # If it dies, retry it with exponential backoff until it succeeds,
2083   # or until the current retry_count is exhausted.  After each failure
2084   # that can be retried, the second function will be called with
2085   # the current try count (0-based), next try time, and error message.
2086   my $operation = shift;
2087   my $retry_callback = shift;
2088   my $retries = retry_count();
2089   foreach my $try_count (0..$retries) {
2090     my $next_try = time + (2 ** $try_count);
2091     my $result = eval { $operation->(@_); };
2092     if (!$@) {
2093       return $result;
2094     } elsif ($try_count < $retries) {
2095       $retry_callback->($try_count, $next_try, $@);
2096       my $sleep_time = $next_try - time;
2097       sleep($sleep_time) if ($sleep_time > 0);
2098     }
2099   }
2100   # Ensure the error message ends in a newline, so Perl doesn't add
2101   # retry_op's line number to it.
2102   chomp($@);
2103   die($@ . "\n");
2104 }
2105
2106 sub api_call {
2107   # Pass in a /-separated API method name, and arguments for it.
2108   # This function will call that method, retrying as needed until
2109   # the current retry_count is exhausted, with a log on the first failure.
2110   my $method_name = shift;
2111   my $log_api_retry = sub {
2112     my ($try_count, $next_try_at, $errmsg) = @_;
2113     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2114     $errmsg =~ s/\s/ /g;
2115     $errmsg =~ s/\s+$//;
2116     my $retry_msg;
2117     if ($next_try_at < time) {
2118       $retry_msg = "Retrying.";
2119     } else {
2120       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2121       $retry_msg = "Retrying at $next_try_fmt.";
2122     }
2123     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2124   };
2125   my $method = $arv;
2126   foreach my $key (split(/\//, $method_name)) {
2127     $method = $method->{$key};
2128   }
2129   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2130 }
2131
2132 sub exit_status_s {
2133   # Given a $?, return a human-readable exit code string like "0" or
2134   # "1" or "0 with signal 1" or "1 with signal 11".
2135   my $exitcode = shift;
2136   my $s = $exitcode >> 8;
2137   if ($exitcode & 0x7f) {
2138     $s .= " with signal " . ($exitcode & 0x7f);
2139   }
2140   if ($exitcode & 0x80) {
2141     $s .= " with core dump";
2142   }
2143   return $s;
2144 }
2145
2146 sub handle_readall {
2147   # Pass in a glob reference to a file handle.
2148   # Read all its contents and return them as a string.
2149   my $fh_glob_ref = shift;
2150   local $/ = undef;
2151   return <$fh_glob_ref>;
2152 }
2153
2154 sub tar_filename_n {
2155   my $n = shift;
2156   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2157 }
2158
2159 sub add_git_archive {
2160   # Pass in a git archive command as a string or list, a la system().
2161   # This method will save its output to be included in the archive sent to the
2162   # build script.
2163   my $git_input;
2164   $git_tar_count++;
2165   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2166     croak("Failed to save git archive: $!");
2167   }
2168   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2169   close($git_input);
2170   waitpid($git_pid, 0);
2171   close(GIT_ARCHIVE);
2172   if ($?) {
2173     croak("Failed to save git archive: git exited " . exit_status_s($?));
2174   }
2175 }
2176
2177 sub combined_git_archive {
2178   # Combine all saved tar archives into a single archive, then return its
2179   # contents in a string.  Return undef if no archives have been saved.
2180   if ($git_tar_count < 1) {
2181     return undef;
2182   }
2183   my $base_tar_name = tar_filename_n(1);
2184   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2185     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2186     if ($tar_exit != 0) {
2187       croak("Error preparing build archive: tar -A exited " .
2188             exit_status_s($tar_exit));
2189     }
2190   }
2191   if (!open(GIT_TAR, "<", $base_tar_name)) {
2192     croak("Could not open build archive: $!");
2193   }
2194   my $tar_contents = handle_readall(\*GIT_TAR);
2195   close(GIT_TAR);
2196   return $tar_contents;
2197 }
2198
2199 sub set_nonblocking {
2200   my $fh = shift;
2201   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2202   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2203 }
2204
2205 __DATA__
2206 #!/usr/bin/env perl
2207 #
2208 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2209 # server invokes this script on individual compute nodes, or localhost if we're
2210 # running a job locally.  It gets called in two modes:
2211 #
2212 # * No arguments: Installation mode.  Read a tar archive from the DATA
2213 #   file handle; it includes the Crunch script's source code, and
2214 #   maybe SDKs as well.  Those should be installed in the proper
2215 #   locations.  This runs outside of any Docker container, so don't try to
2216 #   introspect Crunch's runtime environment.
2217 #
2218 # * With arguments: Crunch script run mode.  This script should set up the
2219 #   environment, then run the command specified in the arguments.  This runs
2220 #   inside any Docker container.
2221
2222 use Fcntl ':flock';
2223 use File::Path qw( make_path remove_tree );
2224 use POSIX qw(getcwd);
2225
2226 use constant TASK_TEMPFAIL => 111;
2227
2228 # Map SDK subdirectories to the path environments they belong to.
2229 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2230
2231 my $destdir = $ENV{"CRUNCH_SRC"};
2232 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2233 my $repo = $ENV{"CRUNCH_SRC_URL"};
2234 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2235 my $job_work = $ENV{"JOB_WORK"};
2236 my $task_work = $ENV{"TASK_WORK"};
2237
2238 open(STDOUT_ORIG, ">&", STDOUT);
2239 open(STDERR_ORIG, ">&", STDERR);
2240
2241 for my $dir ($destdir, $job_work, $task_work) {
2242   if ($dir) {
2243     make_path $dir;
2244     -e $dir or die "Failed to create temporary directory ($dir): $!";
2245   }
2246 }
2247
2248 if ($task_work) {
2249   remove_tree($task_work, {keep_root => 1});
2250 }
2251
2252 ### Crunch script run mode
2253 if (@ARGV) {
2254   # We want to do routine logging during task 0 only.  This gives the user
2255   # the information they need, but avoids repeating the information for every
2256   # task.
2257   my $Log;
2258   if ($ENV{TASK_SEQUENCE} eq "0") {
2259     $Log = sub {
2260       my $msg = shift;
2261       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2262     };
2263   } else {
2264     $Log = sub { };
2265   }
2266
2267   my $python_src = "$install_dir/python";
2268   my $venv_dir = "$job_work/.arvados.venv";
2269   my $venv_built = -e "$venv_dir/bin/activate";
2270   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2271     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2272                  "--python=python2.7", $venv_dir);
2273     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2274     $venv_built = 1;
2275     $Log->("Built Python SDK virtualenv");
2276   }
2277
2278   my @pysdk_version_cmd = ("python", "-c",
2279     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2280   if ($venv_built) {
2281     $Log->("Running in Python SDK virtualenv");
2282     @pysdk_version_cmd = ();
2283     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2284     @ARGV = ("/bin/sh", "-ec",
2285              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2286   } elsif (-d $python_src) {
2287     $Log->("Warning: virtualenv not found inside Docker container default " .
2288            "\$PATH. Can't install Python SDK.");
2289   }
2290
2291   if (@pysdk_version_cmd) {
2292     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2293     my $pysdk_version = <$pysdk_version_pipe>;
2294     close($pysdk_version_pipe);
2295     if ($? == 0) {
2296       chomp($pysdk_version);
2297       $Log->("Using Arvados SDK version $pysdk_version");
2298     } else {
2299       # A lot could've gone wrong here, but pretty much all of it means that
2300       # Python won't be able to load the Arvados SDK.
2301       $Log->("Warning: Arvados SDK not found");
2302     }
2303   }
2304
2305   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2306     my $sdk_path = "$install_dir/$sdk_dir";
2307     if (-d $sdk_path) {
2308       if ($ENV{$sdk_envkey}) {
2309         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2310       } else {
2311         $ENV{$sdk_envkey} = $sdk_path;
2312       }
2313       $Log->("Arvados SDK added to %s", $sdk_envkey);
2314     }
2315   }
2316
2317   exec(@ARGV);
2318   die "Cannot exec `@ARGV`: $!";
2319 }
2320
2321 ### Installation mode
2322 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2323 flock L, LOCK_EX;
2324 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2325   # This exact git archive (source + arvados sdk) is already installed
2326   # here, so there's no need to reinstall it.
2327
2328   # We must consume our DATA section, though: otherwise the process
2329   # feeding it to us will get SIGPIPE.
2330   my $buf;
2331   while (read(DATA, $buf, 65536)) { }
2332
2333   exit(0);
2334 }
2335
2336 unlink "$destdir.archive_hash";
2337 mkdir $destdir;
2338
2339 do {
2340   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2341   local $SIG{PIPE} = "IGNORE";
2342   warn "Extracting archive: $archive_hash\n";
2343   # --ignore-zeros is necessary sometimes: depending on how much NUL
2344   # padding tar -A put on our combined archive (which in turn depends
2345   # on the length of the component archives) tar without
2346   # --ignore-zeros will exit before consuming stdin and cause close()
2347   # to fail on the resulting SIGPIPE.
2348   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2349     die "Error launching 'tar -xC $destdir': $!";
2350   }
2351   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2352   # get SIGPIPE.  We must feed it data incrementally.
2353   my $tar_input;
2354   while (read(DATA, $tar_input, 65536)) {
2355     print TARX $tar_input;
2356   }
2357   if(!close(TARX)) {
2358     die "'tar -xC $destdir' exited $?: $!";
2359   }
2360 };
2361
2362 mkdir $install_dir;
2363
2364 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2365 if (-d $sdk_root) {
2366   foreach my $sdk_lang (("python",
2367                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2368     if (-d "$sdk_root/$sdk_lang") {
2369       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2370         die "Failed to install $sdk_lang SDK: $!";
2371       }
2372     }
2373   }
2374 }
2375
2376 my $python_dir = "$install_dir/python";
2377 if ((-d $python_dir) and can_run("python2.7")) {
2378   open(my $egg_info_pipe, "-|",
2379        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2380   my @egg_info_errors = <$egg_info_pipe>;
2381   close($egg_info_pipe);
2382
2383   if ($?) {
2384     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2385       # egg_info apparently failed because it couldn't ask git for a build tag.
2386       # Specify no build tag.
2387       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2388       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2389       close($pysdk_cfg);
2390     } else {
2391       my $egg_info_exit = $? >> 8;
2392       foreach my $errline (@egg_info_errors) {
2393         warn $errline;
2394       }
2395       warn "python setup.py egg_info failed: exit $egg_info_exit";
2396       exit ($egg_info_exit || 1);
2397     }
2398   }
2399 }
2400
2401 # Hide messages from the install script (unless it fails: shell_or_die
2402 # will show $destdir.log in that case).
2403 open(STDOUT, ">>", "$destdir.log");
2404 open(STDERR, ">&", STDOUT);
2405
2406 if (-e "$destdir/crunch_scripts/install") {
2407     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2408 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2409     # Old version
2410     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2411 } elsif (-e "./install.sh") {
2412     shell_or_die (undef, "./install.sh", $install_dir);
2413 }
2414
2415 if ($archive_hash) {
2416     unlink "$destdir.archive_hash.new";
2417     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2418     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2419 }
2420
2421 close L;
2422
2423 sub can_run {
2424   my $command_name = shift;
2425   open(my $which, "-|", "which", $command_name);
2426   while (<$which>) { }
2427   close($which);
2428   return ($? == 0);
2429 }
2430
2431 sub shell_or_die
2432 {
2433   my $exitcode = shift;
2434
2435   if ($ENV{"DEBUG"}) {
2436     print STDERR "@_\n";
2437   }
2438   if (system (@_) != 0) {
2439     my $err = $!;
2440     my $code = $?;
2441     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2442     open STDERR, ">&STDERR_ORIG";
2443     system ("cat $destdir.log >&2");
2444     warn "@_ failed ($err): $exitstatus";
2445     if (defined($exitcode)) {
2446       exit $exitcode;
2447     }
2448     else {
2449       exit (($code >> 8) || 1);
2450     }
2451   }
2452 }
2453
2454 __DATA__