8807: crunch-job saves logs before exiting to retry.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 $ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
113 unless (defined($ENV{"HOST_CERTS"})) {
114   if (-f "/etc/ssl/certs/ca-certificates.crt") {
115     $ENV{"HOST_CERTS"} = "/etc/ssl/certs/ca-certificates.crt";
116   } elsif (-f "/etc/pki/tls/certs/ca-bundle.crt") {
117     $ENV{"HOST_CERTS"} = "/etc/pki/tls/certs/ca-bundle.crt";
118   }
119 }
120
121 # Create the tmp directory if it does not exist
122 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
123   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
124 }
125
126 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
127 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
128 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
129 mkdir ($ENV{"JOB_WORK"});
130
131 my %proc;
132 my $force_unlock;
133 my $git_dir;
134 my $jobspec;
135 my $job_api_token;
136 my $no_clear_tmp;
137 my $resume_stash;
138 my $cgroup_root = "/sys/fs/cgroup";
139 my $docker_bin = "docker.io";
140 my $docker_run_args = "";
141 GetOptions('force-unlock' => \$force_unlock,
142            'git-dir=s' => \$git_dir,
143            'job=s' => \$jobspec,
144            'job-api-token=s' => \$job_api_token,
145            'no-clear-tmp' => \$no_clear_tmp,
146            'resume-stash=s' => \$resume_stash,
147            'cgroup-root=s' => \$cgroup_root,
148            'docker-bin=s' => \$docker_bin,
149            'docker-run-args=s' => \$docker_run_args,
150     );
151
152 if (defined $job_api_token) {
153   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
154 }
155
156 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
157
158
159 $SIG{'USR1'} = sub
160 {
161   $main::ENV{CRUNCH_DEBUG} = 1;
162 };
163 $SIG{'USR2'} = sub
164 {
165   $main::ENV{CRUNCH_DEBUG} = 0;
166 };
167
168 my $arv = Arvados->new('apiVersion' => 'v1');
169
170 my $Job;
171 my $job_id;
172 my $dbh;
173 my $sth;
174 my @jobstep;
175
176 my $local_job;
177 if ($jobspec =~ /^[-a-z\d]+$/)
178 {
179   # $jobspec is an Arvados UUID, not a JSON job specification
180   $Job = api_call("jobs/get", uuid => $jobspec);
181   $job_id = $Job->{'uuid'};
182   log_writer_start("${job_id}.log.txt");
183   $local_job = 0;
184 }
185 else
186 {
187   $local_job = JSON::decode_json($jobspec);
188 }
189
190
191 # Make sure our workers (our slurm nodes, localhost, or whatever) are
192 # at least able to run basic commands: they aren't down or severely
193 # misconfigured.
194 my $cmd = ['true'];
195 if (($Job || $local_job)->{docker_image_locator}) {
196   $cmd = [$docker_bin, 'ps', '-q'];
197 }
198 Log(undef, "Sanity check is `@$cmd`");
199 my ($exited, $stdout, $stderr) = srun_sync(
200   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
201   $cmd,
202   {label => "sanity check"});
203 if ($exited != 0) {
204   Log(undef, "Sanity check failed: ".exit_status_s($exited));
205   save_meta();
206   exit EX_TEMPFAIL;
207 }
208 Log(undef, "Sanity check OK");
209
210
211 my $User = api_call("users/current");
212
213 if (!$local_job) {
214   if (!$force_unlock) {
215     # Claim this job, and make sure nobody else does
216     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
217     if ($@) {
218       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
219       save_meta();
220       exit EX_TEMPFAIL;
221     };
222   }
223 }
224 else
225 {
226   if (!$resume_stash)
227   {
228     map { croak ("No $_ specified") unless $local_job->{$_} }
229     qw(script script_version script_parameters);
230   }
231
232   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
233   $local_job->{'started_at'} = gmtime;
234   $local_job->{'state'} = 'Running';
235
236   $Job = api_call("jobs/create", job => $local_job);
237   $job_id = $Job->{'uuid'};
238   log_writer_start("${job_id}.log.txt");
239 }
240
241 $Job->{'runtime_constraints'} ||= {};
242 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
243 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
244
245 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
246 if ($? == 0) {
247   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
248   chomp($gem_versions);
249   chop($gem_versions);  # Closing parentheses
250 } else {
251   $gem_versions = "";
252 }
253 Log(undef,
254     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
255
256 Log (undef, "check slurm allocation");
257 my @slot;
258 my @node;
259 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
260 my @sinfo;
261 if (!$have_slurm)
262 {
263   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
264   push @sinfo, "$localcpus localhost";
265 }
266 if (exists $ENV{SLURM_NODELIST})
267 {
268   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
269 }
270 foreach (@sinfo)
271 {
272   my ($ncpus, $slurm_nodelist) = split;
273   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
274
275   my @nodelist;
276   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
277   {
278     my $nodelist = $1;
279     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
280     {
281       my $ranges = $1;
282       foreach (split (",", $ranges))
283       {
284         my ($a, $b);
285         if (/(\d+)-(\d+)/)
286         {
287           $a = $1;
288           $b = $2;
289         }
290         else
291         {
292           $a = $_;
293           $b = $_;
294         }
295         push @nodelist, map {
296           my $n = $nodelist;
297           $n =~ s/\[[-,\d]+\]/$_/;
298           $n;
299         } ($a..$b);
300       }
301     }
302     else
303     {
304       push @nodelist, $nodelist;
305     }
306   }
307   foreach my $nodename (@nodelist)
308   {
309     Log (undef, "node $nodename - $ncpus slots");
310     my $node = { name => $nodename,
311                  ncpus => $ncpus,
312                  # The number of consecutive times a task has been dispatched
313                  # to this node and failed.
314                  losing_streak => 0,
315                  # The number of consecutive times that SLURM has reported
316                  # a node failure since the last successful task.
317                  fail_count => 0,
318                  # Don't dispatch work to this node until this time
319                  # (in seconds since the epoch) has passed.
320                  hold_until => 0 };
321     foreach my $cpu (1..$ncpus)
322     {
323       push @slot, { node => $node,
324                     cpu => $cpu };
325     }
326   }
327   push @node, @nodelist;
328 }
329
330
331
332 # Ensure that we get one jobstep running on each allocated node before
333 # we start overloading nodes with concurrent steps
334
335 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
336
337
338 $Job->update_attributes(
339   'tasks_summary' => { 'failed' => 0,
340                        'todo' => 1,
341                        'running' => 0,
342                        'done' => 0 });
343
344 Log (undef, "start");
345 $SIG{'INT'} = sub { $main::please_freeze = 1; };
346 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
347 $SIG{'TERM'} = \&croak;
348 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
349 $SIG{'ALRM'} = sub { $main::please_info = 1; };
350 $SIG{'CONT'} = sub { $main::please_continue = 1; };
351 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
352
353 $main::please_freeze = 0;
354 $main::please_info = 0;
355 $main::please_continue = 0;
356 $main::please_refresh = 0;
357 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
358
359 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
360 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
361 $ENV{"JOB_UUID"} = $job_id;
362
363
364 my @jobstep_todo = ();
365 my @jobstep_done = ();
366 my @jobstep_tomerge = ();
367 my $jobstep_tomerge_level = 0;
368 my $squeue_checked = 0;
369 my $latest_refresh = scalar time;
370
371
372
373 if (defined $Job->{thawedfromkey})
374 {
375   thaw ($Job->{thawedfromkey});
376 }
377 else
378 {
379   my $first_task = api_call("job_tasks/create", job_task => {
380     'job_uuid' => $Job->{'uuid'},
381     'sequence' => 0,
382     'qsequence' => 0,
383     'parameters' => {},
384   });
385   push @jobstep, { 'level' => 0,
386                    'failures' => 0,
387                    'arvados_task' => $first_task,
388                  };
389   push @jobstep_todo, 0;
390 }
391
392
393 if (!$have_slurm)
394 {
395   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
396 }
397
398 my $build_script = handle_readall(\*DATA);
399 my $nodelist = join(",", @node);
400 my $git_tar_count = 0;
401
402 if (!defined $no_clear_tmp) {
403   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
404   # up work directories crunch_tmp/work, crunch_tmp/opt,
405   # crunch_tmp/src*.
406   #
407   # TODO: When #5036 is done and widely deployed, we can limit mount's
408   # -t option to simply fuse.keep.
409   my ($exited, $stdout, $stderr) = srun_sync(
410     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
411     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
412     {label => "clean work dirs"});
413   if ($exited != 0) {
414     save_meta();
415     exit(EX_RETRY_UNLOCKED);
416   }
417 }
418
419 # If this job requires a Docker image, install that.
420 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
421 if ($docker_locator = $Job->{docker_image_locator}) {
422   Log (undef, "Install docker image $docker_locator");
423   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
424   if (!$docker_hash)
425   {
426     croak("No Docker image hash found from locator $docker_locator");
427   }
428   Log (undef, "docker image hash is $docker_hash");
429   $docker_stream =~ s/^\.//;
430   my $docker_install_script = qq{
431 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
432     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
433 fi
434 };
435
436   my ($exited, $stdout, $stderr) = srun_sync(
437     ["srun", "--nodelist=" . join(',', @node)],
438     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
439     {label => "load docker image"});
440   if ($exited != 0)
441   {
442     save_meta();
443     exit(EX_RETRY_UNLOCKED);
444   }
445
446   # Determine whether this version of Docker supports memory+swap limits.
447   ($exited, $stdout, $stderr) = srun_sync(
448     ["srun", "--nodes=1"],
449     [$docker_bin, 'run', '--help'],
450     {label => "check --memory-swap feature"});
451   $docker_limitmem = ($stdout =~ /--memory-swap/);
452
453   # Find a non-root Docker user to use.
454   # Tries the default user for the container, then 'crunch', then 'nobody',
455   # testing for whether the actual user id is non-zero.  This defends against
456   # mistakes but not malice, but we intend to harden the security in the future
457   # so we don't want anyone getting used to their jobs running as root in their
458   # Docker containers.
459   my @tryusers = ("", "crunch", "nobody");
460   foreach my $try_user (@tryusers) {
461     my $label;
462     my $try_user_arg;
463     if ($try_user eq "") {
464       $label = "check whether default user is UID 0";
465       $try_user_arg = "";
466     } else {
467       $label = "check whether user '$try_user' is UID 0";
468       $try_user_arg = "--user=$try_user";
469     }
470     my ($exited, $stdout, $stderr) = srun_sync(
471       ["srun", "--nodes=1"],
472       ["/bin/sh", "-ec",
473        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
474       {label => $label});
475     chomp($stdout);
476     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
477       $dockeruserarg = $try_user_arg;
478       if ($try_user eq "") {
479         Log(undef, "Container will run with default user");
480       } else {
481         Log(undef, "Container will run with $dockeruserarg");
482       }
483       last;
484     }
485   }
486
487   if (!defined $dockeruserarg) {
488     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
489   }
490
491   if ($Job->{arvados_sdk_version}) {
492     # The job also specifies an Arvados SDK version.  Add the SDKs to the
493     # tar file for the build script to install.
494     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
495                        $Job->{arvados_sdk_version}));
496     add_git_archive("git", "--git-dir=$git_dir", "archive",
497                     "--prefix=.arvados.sdk/",
498                     $Job->{arvados_sdk_version}, "sdk");
499   }
500 }
501
502 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
503   # If script_version looks like an absolute path, *and* the --git-dir
504   # argument was not given -- which implies we were not invoked by
505   # crunch-dispatch -- we will use the given path as a working
506   # directory instead of resolving script_version to a git commit (or
507   # doing anything else with git).
508   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
509   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
510 }
511 else {
512   # Resolve the given script_version to a git commit sha1. Also, if
513   # the repository is remote, clone it into our local filesystem: this
514   # ensures "git archive" will work, and is necessary to reliably
515   # resolve a symbolic script_version like "master^".
516   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
517
518   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
519
520   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
521
522   # If we're running under crunch-dispatch, it will have already
523   # pulled the appropriate source tree into its own repository, and
524   # given us that repo's path as $git_dir.
525   #
526   # If we're running a "local" job, we might have to fetch content
527   # from a remote repository.
528   #
529   # (Currently crunch-dispatch gives a local path with --git-dir, but
530   # we might as well accept URLs there too in case it changes its
531   # mind.)
532   my $repo = $git_dir || $Job->{'repository'};
533
534   # Repository can be remote or local. If remote, we'll need to fetch it
535   # to a local dir before doing `git log` et al.
536   my $repo_location;
537
538   if ($repo =~ m{://|^[^/]*:}) {
539     # $repo is a git url we can clone, like git:// or https:// or
540     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
541     # not recognized here because distinguishing that from a local
542     # path is too fragile. If you really need something strange here,
543     # use the ssh:// form.
544     $repo_location = 'remote';
545   } elsif ($repo =~ m{^\.*/}) {
546     # $repo is a local path to a git index. We'll also resolve ../foo
547     # to ../foo/.git if the latter is a directory. To help
548     # disambiguate local paths from named hosted repositories, this
549     # form must be given as ./ or ../ if it's a relative path.
550     if (-d "$repo/.git") {
551       $repo = "$repo/.git";
552     }
553     $repo_location = 'local';
554   } else {
555     # $repo is none of the above. It must be the name of a hosted
556     # repository.
557     my $arv_repo_list = api_call("repositories/list",
558                                  'filters' => [['name','=',$repo]]);
559     my @repos_found = @{$arv_repo_list->{'items'}};
560     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
561     if ($n_found > 0) {
562       Log(undef, "Repository '$repo' -> "
563           . join(", ", map { $_->{'uuid'} } @repos_found));
564     }
565     if ($n_found != 1) {
566       croak("Error: Found $n_found repositories with name '$repo'.");
567     }
568     $repo = $repos_found[0]->{'fetch_url'};
569     $repo_location = 'remote';
570   }
571   Log(undef, "Using $repo_location repository '$repo'");
572   $ENV{"CRUNCH_SRC_URL"} = $repo;
573
574   # Resolve given script_version (we'll call that $treeish here) to a
575   # commit sha1 ($commit).
576   my $treeish = $Job->{'script_version'};
577   my $commit;
578   if ($repo_location eq 'remote') {
579     # We minimize excess object-fetching by re-using the same bare
580     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
581     # just keep adding remotes to it as needed.
582     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
583     my $gitcmd = "git --git-dir=\Q$local_repo\E";
584
585     # Set up our local repo for caching remote objects, making
586     # archives, etc.
587     if (!-d $local_repo) {
588       make_path($local_repo) or croak("Error: could not create $local_repo");
589     }
590     # This works (exits 0 and doesn't delete fetched objects) even
591     # if $local_repo is already initialized:
592     `$gitcmd init --bare`;
593     if ($?) {
594       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
595     }
596
597     # If $treeish looks like a hash (or abbrev hash) we look it up in
598     # our local cache first, since that's cheaper. (We don't want to
599     # do that with tags/branches though -- those change over time, so
600     # they should always be resolved by the remote repo.)
601     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
602       # Hide stderr because it's normal for this to fail:
603       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
604       if ($? == 0 &&
605           # Careful not to resolve a branch named abcdeff to commit 1234567:
606           $sha1 =~ /^$treeish/ &&
607           $sha1 =~ /^([0-9a-f]{40})$/s) {
608         $commit = $1;
609         Log(undef, "Commit $commit already present in $local_repo");
610       }
611     }
612
613     if (!defined $commit) {
614       # If $treeish isn't just a hash or abbrev hash, or isn't here
615       # yet, we need to fetch the remote to resolve it correctly.
616
617       # First, remove all local heads. This prevents a name that does
618       # not exist on the remote from resolving to (or colliding with)
619       # a previously fetched branch or tag (possibly from a different
620       # remote).
621       remove_tree("$local_repo/refs/heads", {keep_root => 1});
622
623       Log(undef, "Fetching objects from $repo to $local_repo");
624       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
625       if ($?) {
626         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
627       }
628     }
629
630     # Now that the data is all here, we will use our local repo for
631     # the rest of our git activities.
632     $repo = $local_repo;
633   }
634
635   my $gitcmd = "git --git-dir=\Q$repo\E";
636   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
637   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
638     croak("`$gitcmd rev-list` exited "
639           .exit_status_s($?)
640           .", '$treeish' not found, giving up");
641   }
642   $commit = $1;
643   Log(undef, "Version $treeish is commit $commit");
644
645   if ($commit ne $Job->{'script_version'}) {
646     # Record the real commit id in the database, frozentokey, logs,
647     # etc. -- instead of an abbreviation or a branch name which can
648     # become ambiguous or point to a different commit in the future.
649     if (!$Job->update_attributes('script_version' => $commit)) {
650       croak("Error: failed to update job's script_version attribute");
651     }
652   }
653
654   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
655   add_git_archive("$gitcmd archive ''\Q$commit\E");
656 }
657
658 my $git_archive = combined_git_archive();
659 if (!defined $git_archive) {
660   Log(undef, "Skip install phase (no git archive)");
661   if ($have_slurm) {
662     Log(undef, "Warning: This probably means workers have no source tree!");
663   }
664 }
665 else {
666   my $exited;
667   my $install_script_tries_left = 3;
668   for (my $attempts = 0; $attempts < 3; $attempts++) {
669     my @srunargs = ("srun",
670                     "--nodelist=$nodelist",
671                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
672     my @execargs = ("sh", "-c",
673                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
674
675     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
676     my ($stdout, $stderr);
677     ($exited, $stdout, $stderr) = srun_sync(
678       \@srunargs, \@execargs,
679       {label => "run install script on all workers"},
680       $build_script . $git_archive);
681
682     my $stderr_anything_from_script = 0;
683     for my $line (split(/\n/, $stderr)) {
684       if ($line !~ /^(srun: error: |starting: \[)/) {
685         $stderr_anything_from_script = 1;
686       }
687     }
688
689     last if $exited == 0 || $main::please_freeze;
690
691     # If the install script fails but doesn't print an error message,
692     # the next thing anyone is likely to do is just run it again in
693     # case it was a transient problem like "slurm communication fails
694     # because the network isn't reliable enough". So we'll just do
695     # that ourselves (up to 3 attempts in total). OTOH, if there is an
696     # error message, the problem is more likely to have a real fix and
697     # we should fail the job so the fixing process can start, instead
698     # of doing 2 more attempts.
699     last if $stderr_anything_from_script;
700   }
701
702   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
703     unlink($tar_filename);
704   }
705
706   if ($exited != 0) {
707     croak("Giving up");
708   }
709 }
710
711 foreach (qw (script script_version script_parameters runtime_constraints))
712 {
713   Log (undef,
714        "$_ " .
715        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
716 }
717 foreach (split (/\n/, $Job->{knobs}))
718 {
719   Log (undef, "knob " . $_);
720 }
721
722
723
724 $main::success = undef;
725
726
727
728 ONELEVEL:
729
730 my $thisround_succeeded = 0;
731 my $thisround_failed = 0;
732 my $thisround_failed_multiple = 0;
733 my $working_slot_count = scalar(@slot);
734
735 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
736                        or $a <=> $b } @jobstep_todo;
737 my $level = $jobstep[$jobstep_todo[0]]->{level};
738
739 my $initial_tasks_this_level = 0;
740 foreach my $id (@jobstep_todo) {
741   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
742 }
743
744 # If the number of tasks scheduled at this level #T is smaller than the number
745 # of slots available #S, only use the first #T slots, or the first slot on
746 # each node, whichever number is greater.
747 #
748 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
749 # based on these numbers.  Using fewer slots makes more resources available
750 # to each individual task, which should normally be a better strategy when
751 # there are fewer of them running with less parallelism.
752 #
753 # Note that this calculation is not redone if the initial tasks at
754 # this level queue more tasks at the same level.  This may harm
755 # overall task throughput for that level.
756 my @freeslot;
757 if ($initial_tasks_this_level < @node) {
758   @freeslot = (0..$#node);
759 } elsif ($initial_tasks_this_level < @slot) {
760   @freeslot = (0..$initial_tasks_this_level - 1);
761 } else {
762   @freeslot = (0..$#slot);
763 }
764 my $round_num_freeslots = scalar(@freeslot);
765 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
766
767 my %round_max_slots = ();
768 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
769   my $this_slot = $slot[$freeslot[$ii]];
770   my $node_name = $this_slot->{node}->{name};
771   $round_max_slots{$node_name} ||= $this_slot->{cpu};
772   last if (scalar(keys(%round_max_slots)) >= @node);
773 }
774
775 Log(undef, "start level $level with $round_num_freeslots slots");
776 my @holdslot;
777 my %reader;
778 my $progress_is_dirty = 1;
779 my $progress_stats_updated = 0;
780
781 update_progress_stats();
782
783
784 THISROUND:
785 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
786 {
787   # Don't create new tasks if we already know the job's final result.
788   last if defined($main::success);
789
790   my $id = $jobstep_todo[$todo_ptr];
791   my $Jobstep = $jobstep[$id];
792   if ($Jobstep->{level} != $level)
793   {
794     next;
795   }
796
797   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
798   set_nonblocking($reader{$id});
799
800   my $childslot = $freeslot[0];
801   my $childnode = $slot[$childslot]->{node};
802   my $childslotname = join (".",
803                             $slot[$childslot]->{node}->{name},
804                             $slot[$childslot]->{cpu});
805
806   my $childpid = fork();
807   if ($childpid == 0)
808   {
809     $SIG{'INT'} = 'DEFAULT';
810     $SIG{'QUIT'} = 'DEFAULT';
811     $SIG{'TERM'} = 'DEFAULT';
812
813     foreach (values (%reader))
814     {
815       close($_);
816     }
817     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
818     open(STDOUT,">&writer");
819     open(STDERR,">&writer");
820
821     undef $dbh;
822     undef $sth;
823
824     delete $ENV{"GNUPGHOME"};
825     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
826     $ENV{"TASK_QSEQUENCE"} = $id;
827     $ENV{"TASK_SEQUENCE"} = $level;
828     $ENV{"JOB_SCRIPT"} = $Job->{script};
829     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
830       $param =~ tr/a-z/A-Z/;
831       $ENV{"JOB_PARAMETER_$param"} = $value;
832     }
833     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
834     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
835     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
836     $ENV{"HOME"} = $ENV{"TASK_WORK"};
837     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
838     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
839     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
840
841     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
842
843     $ENV{"GZIP"} = "-n";
844
845     my @srunargs = (
846       "srun",
847       "--nodelist=".$childnode->{name},
848       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
849       "--job-name=$job_id.$id.$$",
850         );
851
852     my $stdbuf = " stdbuf --output=0 --error=0 ";
853
854     my $arv_file_cache = "";
855     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
856       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
857     }
858
859     my $command =
860         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
861         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
862         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
863         # These environment variables get used explicitly later in
864         # $command.  No tool is expected to read these values directly.
865         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
866         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
867         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
868         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
869
870     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
871     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
872     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
873
874     if ($docker_hash)
875     {
876       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
877       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
878       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
879       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
880       # We only set memory limits if Docker lets us limit both memory and swap.
881       # Memory limits alone have been supported longer, but subprocesses tend
882       # to get SIGKILL if they exceed that without any swap limit set.
883       # See #5642 for additional background.
884       if ($docker_limitmem) {
885         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
886       }
887
888       # The source tree and $destdir directory (which we have
889       # installed on the worker host) are available in the container,
890       # under the same path.
891       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
892       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
893
894       # Currently, we make the "by_pdh" directory in arv-mount's mount
895       # point appear at /keep inside the container (instead of using
896       # the same path as the host like we do with CRUNCH_SRC and
897       # CRUNCH_INSTALL). However, crunch scripts and utilities must
898       # not rely on this. They must use $TASK_KEEPMOUNT.
899       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
900       $ENV{TASK_KEEPMOUNT} = "/keep";
901
902       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
903       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
904       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
905
906       # TASK_WORK is almost exactly like a docker data volume: it
907       # starts out empty, is writable, and persists until no
908       # containers use it any more. We don't use --volumes-from to
909       # share it with other containers: it is only accessible to this
910       # task, and it goes away when this task stops.
911       #
912       # However, a docker data volume is writable only by root unless
913       # the mount point already happens to exist in the container with
914       # different permissions. Therefore, we [1] assume /tmp already
915       # exists in the image and is writable by the crunch user; [2]
916       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
917       # writable if they are created by docker while setting up the
918       # other --volumes); and [3] create $TASK_WORK inside the
919       # container using $build_script.
920       $command .= "--volume=/tmp ";
921       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
922       $ENV{"HOME"} = $ENV{"TASK_WORK"};
923       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
924
925       # TODO: Share a single JOB_WORK volume across all task
926       # containers on a given worker node, and delete it when the job
927       # ends (and, in case that doesn't work, when the next job
928       # starts).
929       #
930       # For now, use the same approach as TASK_WORK above.
931       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
932
933       # Bind mount the crunchrunner binary and host TLS certificates file into
934       # the container.
935       $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/local/bin/crunchrunner\E ";
936       $command .= "--volume=\Q$ENV{HOST_CERTS}:/etc/arvados/ca-certificates.crt\E ";
937
938       while (my ($env_key, $env_val) = each %ENV)
939       {
940         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
941           $command .= "--env=\Q$env_key=$env_val\E ";
942         }
943       }
944       $command .= "--env=\QHOME=$ENV{HOME}\E ";
945       $command .= "\Q$docker_hash\E ";
946
947       if ($Job->{arvados_sdk_version}) {
948         $command .= $stdbuf;
949         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
950       } else {
951         $command .= "/bin/sh -c \'python -c " .
952             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
953             ">&2 2>/dev/null; " .
954             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
955             "if which stdbuf >/dev/null ; then " .
956             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
957             " else " .
958             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
959             " fi\'";
960       }
961     } else {
962       # Non-docker run
963       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
964       $command .= $stdbuf;
965       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
966     }
967
968     my @execargs = ('bash', '-c', $command);
969     srun (\@srunargs, \@execargs, undef, $build_script);
970     # exec() failed, we assume nothing happened.
971     die "srun() failed on build script\n";
972   }
973   close("writer");
974   if (!defined $childpid)
975   {
976     close $reader{$id};
977     delete $reader{$id};
978     next;
979   }
980   shift @freeslot;
981   $proc{$childpid} = {
982     jobstepidx => $id,
983     time => time,
984     slot => $childslot,
985     jobstepname => "$job_id.$id.$childpid",
986   };
987   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
988   $slot[$childslot]->{pid} = $childpid;
989
990   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
991   Log ($id, "child $childpid started on $childslotname");
992   $Jobstep->{starttime} = time;
993   $Jobstep->{node} = $childnode->{name};
994   $Jobstep->{slotindex} = $childslot;
995   delete $Jobstep->{stderr};
996   delete $Jobstep->{finishtime};
997   delete $Jobstep->{tempfail};
998
999   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1000   $Jobstep->{'arvados_task'}->save;
1001
1002   splice @jobstep_todo, $todo_ptr, 1;
1003   --$todo_ptr;
1004
1005   $progress_is_dirty = 1;
1006
1007   while (!@freeslot
1008          ||
1009          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1010   {
1011     last THISROUND if $main::please_freeze;
1012     if ($main::please_info)
1013     {
1014       $main::please_info = 0;
1015       freeze();
1016       create_output_collection();
1017       save_meta(1);
1018       update_progress_stats();
1019     }
1020     my $gotsome
1021         = readfrompipes ()
1022         + reapchildren ();
1023     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1024     {
1025       check_refresh_wanted();
1026       check_squeue();
1027       update_progress_stats();
1028     }
1029     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1030     {
1031       update_progress_stats();
1032     }
1033     if (!$gotsome) {
1034       select (undef, undef, undef, 0.1);
1035     }
1036     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1037                                         $_->{node}->{hold_count} < 4 } @slot);
1038     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1039         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1040     {
1041       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1042           .($thisround_failed+$thisround_succeeded)
1043           .") -- giving up on this round";
1044       Log (undef, $message);
1045       last THISROUND;
1046     }
1047
1048     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1049     for (my $i=$#freeslot; $i>=0; $i--) {
1050       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1051         push @holdslot, (splice @freeslot, $i, 1);
1052       }
1053     }
1054     for (my $i=$#holdslot; $i>=0; $i--) {
1055       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1056         push @freeslot, (splice @holdslot, $i, 1);
1057       }
1058     }
1059
1060     # give up if no nodes are succeeding
1061     if ($working_slot_count < 1) {
1062       Log(undef, "Every node has failed -- giving up");
1063       last THISROUND;
1064     }
1065   }
1066 }
1067
1068
1069 push @freeslot, splice @holdslot;
1070 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1071
1072
1073 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1074 while (%proc)
1075 {
1076   if ($main::please_continue) {
1077     $main::please_continue = 0;
1078     goto THISROUND;
1079   }
1080   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1081   readfrompipes ();
1082   if (!reapchildren())
1083   {
1084     check_refresh_wanted();
1085     check_squeue();
1086     update_progress_stats();
1087     select (undef, undef, undef, 0.1);
1088     killem (keys %proc) if $main::please_freeze;
1089   }
1090 }
1091
1092 update_progress_stats();
1093 freeze_if_want_freeze();
1094
1095
1096 if (!defined $main::success)
1097 {
1098   if (!@jobstep_todo) {
1099     $main::success = 1;
1100   } elsif ($working_slot_count < 1) {
1101     save_output_collection();
1102     save_meta();
1103     exit(EX_RETRY_UNLOCKED);
1104   } elsif ($thisround_succeeded == 0 &&
1105            ($thisround_failed == 0 || $thisround_failed > 4)) {
1106     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1107     Log (undef, $message);
1108     $main::success = 0;
1109   }
1110 }
1111
1112 goto ONELEVEL if !defined $main::success;
1113
1114
1115 release_allocation();
1116 freeze();
1117 my $collated_output = save_output_collection();
1118 Log (undef, "finish");
1119
1120 save_meta();
1121
1122 my $final_state;
1123 if ($collated_output && $main::success) {
1124   $final_state = 'Complete';
1125 } else {
1126   $final_state = 'Failed';
1127 }
1128 $Job->update_attributes('state' => $final_state);
1129
1130 exit (($final_state eq 'Complete') ? 0 : 1);
1131
1132
1133
1134 sub update_progress_stats
1135 {
1136   $progress_stats_updated = time;
1137   return if !$progress_is_dirty;
1138   my ($todo, $done, $running) = (scalar @jobstep_todo,
1139                                  scalar @jobstep_done,
1140                                  scalar keys(%proc));
1141   $Job->{'tasks_summary'} ||= {};
1142   $Job->{'tasks_summary'}->{'todo'} = $todo;
1143   $Job->{'tasks_summary'}->{'done'} = $done;
1144   $Job->{'tasks_summary'}->{'running'} = $running;
1145   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1146   Log (undef, "status: $done done, $running running, $todo todo");
1147   $progress_is_dirty = 0;
1148 }
1149
1150
1151
1152 sub reapchildren
1153 {
1154   my $children_reaped = 0;
1155   my @successful_task_uuids = ();
1156
1157   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1158   {
1159     my $childstatus = $?;
1160
1161     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1162                     . "."
1163                     . $slot[$proc{$pid}->{slot}]->{cpu});
1164     my $jobstepidx = $proc{$pid}->{jobstepidx};
1165
1166     if (!WIFEXITED($childstatus))
1167     {
1168       # child did not exit (may be temporarily stopped)
1169       Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
1170       next;
1171     }
1172
1173     $children_reaped++;
1174     my $elapsed = time - $proc{$pid}->{time};
1175     my $Jobstep = $jobstep[$jobstepidx];
1176
1177     my $exitvalue = $childstatus >> 8;
1178     my $exitinfo = "exit ".exit_status_s($childstatus);
1179     $Jobstep->{'arvados_task'}->reload;
1180     my $task_success = $Jobstep->{'arvados_task'}->{success};
1181
1182     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1183
1184     if (!defined $task_success) {
1185       # task did not indicate one way or the other --> fail
1186       Log($jobstepidx, sprintf(
1187             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1188             exit_status_s($childstatus)));
1189       $Jobstep->{'arvados_task'}->{success} = 0;
1190       $Jobstep->{'arvados_task'}->save;
1191       $task_success = 0;
1192     }
1193
1194     if (!$task_success)
1195     {
1196       my $temporary_fail;
1197       $temporary_fail ||= $Jobstep->{tempfail};
1198       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1199
1200       ++$thisround_failed;
1201       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1202
1203       # Check for signs of a failed or misconfigured node
1204       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1205           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1206         # Don't count this against jobstep failure thresholds if this
1207         # node is already suspected faulty and srun exited quickly
1208         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1209             $elapsed < 5) {
1210           Log ($jobstepidx, "blaming failure on suspect node " .
1211                $slot[$proc{$pid}->{slot}]->{node}->{name});
1212           $temporary_fail ||= 1;
1213         }
1214         ban_node_by_slot($proc{$pid}->{slot});
1215       }
1216
1217       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1218                                 ++$Jobstep->{'failures'},
1219                                 $temporary_fail ? 'temporary' : 'permanent',
1220                                 $elapsed));
1221
1222       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1223         # Give up on this task, and the whole job
1224         $main::success = 0;
1225       }
1226       # Put this task back on the todo queue
1227       push @jobstep_todo, $jobstepidx;
1228       $Job->{'tasks_summary'}->{'failed'}++;
1229     }
1230     else # task_success
1231     {
1232       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1233       ++$thisround_succeeded;
1234       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1235       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1236       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1237       push @jobstep_done, $jobstepidx;
1238       Log ($jobstepidx, "success in $elapsed seconds");
1239     }
1240     $Jobstep->{exitcode} = $childstatus;
1241     $Jobstep->{finishtime} = time;
1242     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1243     $Jobstep->{'arvados_task'}->save;
1244     process_stderr_final ($jobstepidx);
1245     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1246                               length($Jobstep->{'arvados_task'}->{output}),
1247                               $Jobstep->{'arvados_task'}->{output}));
1248
1249     close $reader{$jobstepidx};
1250     delete $reader{$jobstepidx};
1251     delete $slot[$proc{$pid}->{slot}]->{pid};
1252     push @freeslot, $proc{$pid}->{slot};
1253     delete $proc{$pid};
1254
1255     $progress_is_dirty = 1;
1256   }
1257
1258   if (scalar(@successful_task_uuids) > 0)
1259   {
1260     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1261     # Load new tasks
1262     my $newtask_list = [];
1263     my $newtask_results;
1264     do {
1265       $newtask_results = api_call(
1266         "job_tasks/list",
1267         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1268         'order' => 'qsequence',
1269         'offset' => scalar(@$newtask_list),
1270           );
1271       push(@$newtask_list, @{$newtask_results->{items}});
1272     } while (@{$newtask_results->{items}});
1273     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1274     foreach my $arvados_task (@$newtask_list) {
1275       my $jobstep = {
1276         'level' => $arvados_task->{'sequence'},
1277         'failures' => 0,
1278         'arvados_task' => $arvados_task
1279       };
1280       push @jobstep, $jobstep;
1281       push @jobstep_todo, $#jobstep;
1282     }
1283   }
1284
1285   return $children_reaped;
1286 }
1287
1288 sub check_refresh_wanted
1289 {
1290   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1291   if (@stat &&
1292       $stat[9] > $latest_refresh &&
1293       # ...and we have actually locked the job record...
1294       $job_id eq $Job->{'uuid'}) {
1295     $latest_refresh = scalar time;
1296     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1297     for my $attr ('cancelled_at',
1298                   'cancelled_by_user_uuid',
1299                   'cancelled_by_client_uuid',
1300                   'state') {
1301       $Job->{$attr} = $Job2->{$attr};
1302     }
1303     if ($Job->{'state'} ne "Running") {
1304       if ($Job->{'state'} eq "Cancelled") {
1305         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1306       } else {
1307         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1308       }
1309       $main::success = 0;
1310       $main::please_freeze = 1;
1311     }
1312   }
1313 }
1314
1315 sub check_squeue
1316 {
1317   my $last_squeue_check = $squeue_checked;
1318
1319   # Do not call `squeue` or check the kill list more than once every
1320   # 15 seconds.
1321   return if $last_squeue_check > time - 15;
1322   $squeue_checked = time;
1323
1324   # Look for children from which we haven't received stderr data since
1325   # the last squeue check. If no such children exist, all procs are
1326   # alive and there's no need to even look at squeue.
1327   #
1328   # As long as the crunchstat poll interval (10s) is shorter than the
1329   # squeue check interval (15s) this should make the squeue check an
1330   # infrequent event.
1331   my $silent_procs = 0;
1332   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1333   {
1334     if (!exists($js->{stderr_at}))
1335     {
1336       $js->{stderr_at} = 0;
1337     }
1338     if ($js->{stderr_at} < $last_squeue_check)
1339     {
1340       $silent_procs++;
1341     }
1342   }
1343   return if $silent_procs == 0;
1344
1345   # use killem() on procs whose killtime is reached
1346   while (my ($pid, $procinfo) = each %proc)
1347   {
1348     my $js = $jobstep[$procinfo->{jobstepidx}];
1349     if (exists $procinfo->{killtime}
1350         && $procinfo->{killtime} <= time
1351         && $js->{stderr_at} < $last_squeue_check)
1352     {
1353       my $sincewhen = "";
1354       if ($js->{stderr_at}) {
1355         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1356       }
1357       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1358       killem ($pid);
1359     }
1360   }
1361
1362   if (!$have_slurm)
1363   {
1364     # here is an opportunity to check for mysterious problems with local procs
1365     return;
1366   }
1367
1368   # Get a list of steps still running.  Note: squeue(1) says --steps
1369   # selects a format (which we override anyway) and allows us to
1370   # specify which steps we're interested in (which we don't).
1371   # Importantly, it also changes the meaning of %j from "job name" to
1372   # "step name" and (although this isn't mentioned explicitly in the
1373   # docs) switches from "one line per job" mode to "one line per step"
1374   # mode. Without it, we'd just get a list of one job, instead of a
1375   # list of N steps.
1376   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1377   if ($? != 0)
1378   {
1379     Log(undef, "warning: squeue exit status $? ($!)");
1380     return;
1381   }
1382   chop @squeue;
1383
1384   # which of my jobsteps are running, according to squeue?
1385   my %ok;
1386   for my $jobstepname (@squeue)
1387   {
1388     $ok{$jobstepname} = 1;
1389   }
1390
1391   # Check for child procs >60s old and not mentioned by squeue.
1392   while (my ($pid, $procinfo) = each %proc)
1393   {
1394     if ($procinfo->{time} < time - 60
1395         && $procinfo->{jobstepname}
1396         && !exists $ok{$procinfo->{jobstepname}}
1397         && !exists $procinfo->{killtime})
1398     {
1399       # According to slurm, this task has ended (successfully or not)
1400       # -- but our srun child hasn't exited. First we must wait (30
1401       # seconds) in case this is just a race between communication
1402       # channels. Then, if our srun child process still hasn't
1403       # terminated, we'll conclude some slurm communication
1404       # error/delay has caused the task to die without notifying srun,
1405       # and we'll kill srun ourselves.
1406       $procinfo->{killtime} = time + 30;
1407       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1408     }
1409   }
1410 }
1411
1412
1413 sub release_allocation
1414 {
1415   if ($have_slurm)
1416   {
1417     Log (undef, "release job allocation");
1418     system "scancel $ENV{SLURM_JOB_ID}";
1419   }
1420 }
1421
1422
1423 sub readfrompipes
1424 {
1425   my $gotsome = 0;
1426   my %fd_job;
1427   my $sel = IO::Select->new();
1428   foreach my $jobstepidx (keys %reader)
1429   {
1430     my $fd = $reader{$jobstepidx};
1431     $sel->add($fd);
1432     $fd_job{$fd} = $jobstepidx;
1433
1434     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1435       $sel->add($stdout_fd);
1436       $fd_job{$stdout_fd} = $jobstepidx;
1437     }
1438   }
1439   # select on all reader fds with 0.1s timeout
1440   my @ready_fds = $sel->can_read(0.1);
1441   foreach my $fd (@ready_fds)
1442   {
1443     my $buf;
1444     if (0 < sysread ($fd, $buf, 65536))
1445     {
1446       $gotsome = 1;
1447       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1448
1449       my $jobstepidx = $fd_job{$fd};
1450       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1451         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1452         next;
1453       }
1454
1455       $jobstep[$jobstepidx]->{stderr_at} = time;
1456       $jobstep[$jobstepidx]->{stderr} .= $buf;
1457
1458       # Consume everything up to the last \n
1459       preprocess_stderr ($jobstepidx);
1460
1461       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1462       {
1463         # If we get a lot of stderr without a newline, chop off the
1464         # front to avoid letting our buffer grow indefinitely.
1465         substr ($jobstep[$jobstepidx]->{stderr},
1466                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1467       }
1468     }
1469   }
1470   return $gotsome;
1471 }
1472
1473
1474 # Consume all full lines of stderr for a jobstep. Everything after the
1475 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1476 # returning.
1477 sub preprocess_stderr
1478 {
1479   my $jobstepidx = shift;
1480
1481   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1482     my $line = $1;
1483     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1484     Log ($jobstepidx, "stderr $line");
1485     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1486       # whoa.
1487       $main::please_freeze = 1;
1488     }
1489     elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1490       # Skip the following tempfail checks if this srun proc isn't
1491       # attached to a particular worker slot.
1492     }
1493     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1494       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1495       $slot[$job_slot_index]->{node}->{fail_count}++;
1496       $jobstep[$jobstepidx]->{tempfail} = 1;
1497       ban_node_by_slot($job_slot_index);
1498     }
1499     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1500       $jobstep[$jobstepidx]->{tempfail} = 1;
1501       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1502     }
1503     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1504       $jobstep[$jobstepidx]->{tempfail} = 1;
1505     }
1506   }
1507 }
1508
1509
1510 sub process_stderr_final
1511 {
1512   my $jobstepidx = shift;
1513   preprocess_stderr ($jobstepidx);
1514
1515   map {
1516     Log ($jobstepidx, "stderr $_");
1517   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1518   $jobstep[$jobstepidx]->{stderr} = '';
1519 }
1520
1521 sub fetch_block
1522 {
1523   my $hash = shift;
1524   my $keep;
1525   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1526     Log(undef, "fetch_block run error from arv-get $hash: $!");
1527     return undef;
1528   }
1529   my $output_block = "";
1530   while (1) {
1531     my $buf;
1532     my $bytes = sysread($keep, $buf, 1024 * 1024);
1533     if (!defined $bytes) {
1534       Log(undef, "fetch_block read error from arv-get: $!");
1535       $output_block = undef;
1536       last;
1537     } elsif ($bytes == 0) {
1538       # sysread returns 0 at the end of the pipe.
1539       last;
1540     } else {
1541       # some bytes were read into buf.
1542       $output_block .= $buf;
1543     }
1544   }
1545   close $keep;
1546   if ($?) {
1547     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1548     $output_block = undef;
1549   }
1550   return $output_block;
1551 }
1552
1553 # Create a collection by concatenating the output of all tasks (each
1554 # task's output is either a manifest fragment, a locator for a
1555 # manifest fragment stored in Keep, or nothing at all). Return the
1556 # portable_data_hash of the new collection.
1557 sub create_output_collection
1558 {
1559   Log (undef, "collate");
1560
1561   my ($child_out, $child_in);
1562   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1563 import arvados
1564 import sys
1565 print (arvados.api("v1").collections().
1566        create(body={"manifest_text": sys.stdin.read()}).
1567        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1568 }, retry_count());
1569
1570   my $task_idx = -1;
1571   my $manifest_size = 0;
1572   for (@jobstep)
1573   {
1574     ++$task_idx;
1575     my $output = $_->{'arvados_task'}->{output};
1576     next if (!defined($output));
1577     my $next_write;
1578     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1579       $next_write = fetch_block($output);
1580     } else {
1581       $next_write = $output;
1582     }
1583     if (defined($next_write)) {
1584       if (!defined(syswrite($child_in, $next_write))) {
1585         # There's been an error writing.  Stop the loop.
1586         # We'll log details about the exit code later.
1587         last;
1588       } else {
1589         $manifest_size += length($next_write);
1590       }
1591     } else {
1592       my $uuid = $_->{'arvados_task'}->{'uuid'};
1593       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1594       $main::success = 0;
1595     }
1596   }
1597   close($child_in);
1598   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1599
1600   my $joboutput;
1601   my $s = IO::Select->new($child_out);
1602   if ($s->can_read(120)) {
1603     sysread($child_out, $joboutput, 1024 * 1024);
1604     waitpid($pid, 0);
1605     if ($?) {
1606       Log(undef, "output collection creation exited " . exit_status_s($?));
1607       $joboutput = undef;
1608     } else {
1609       chomp($joboutput);
1610     }
1611   } else {
1612     Log (undef, "timed out while creating output collection");
1613     foreach my $signal (2, 2, 2, 15, 15, 9) {
1614       kill($signal, $pid);
1615       last if waitpid($pid, WNOHANG) == -1;
1616       sleep(1);
1617     }
1618   }
1619   close($child_out);
1620
1621   return $joboutput;
1622 }
1623
1624 # Calls create_output_collection, logs the result, and returns it.
1625 # If that was successful, save that as the output in the job record.
1626 sub save_output_collection {
1627   my $collated_output = create_output_collection();
1628
1629   if (!$collated_output) {
1630     Log(undef, "Failed to write output collection");
1631   }
1632   else {
1633     Log(undef, "job output $collated_output");
1634     $Job->update_attributes('output' => $collated_output);
1635   }
1636   return $collated_output;
1637 }
1638
1639 sub killem
1640 {
1641   foreach (@_)
1642   {
1643     my $sig = 2;                # SIGINT first
1644     if (exists $proc{$_}->{"sent_$sig"} &&
1645         time - $proc{$_}->{"sent_$sig"} > 4)
1646     {
1647       $sig = 15;                # SIGTERM if SIGINT doesn't work
1648     }
1649     if (exists $proc{$_}->{"sent_$sig"} &&
1650         time - $proc{$_}->{"sent_$sig"} > 4)
1651     {
1652       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1653     }
1654     if (!exists $proc{$_}->{"sent_$sig"})
1655     {
1656       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1657       kill $sig, $_;
1658       select (undef, undef, undef, 0.1);
1659       if ($sig == 2)
1660       {
1661         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1662       }
1663       $proc{$_}->{"sent_$sig"} = time;
1664       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1665     }
1666   }
1667 }
1668
1669
1670 sub fhbits
1671 {
1672   my($bits);
1673   for (@_) {
1674     vec($bits,fileno($_),1) = 1;
1675   }
1676   $bits;
1677 }
1678
1679
1680 # Send log output to Keep via arv-put.
1681 #
1682 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1683 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1684 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1685 # $log_pipe_pid is the pid of the arv-put subprocess.
1686 #
1687 # The only functions that should access these variables directly are:
1688 #
1689 # log_writer_start($logfilename)
1690 #     Starts an arv-put pipe, reading data on stdin and writing it to
1691 #     a $logfilename file in an output collection.
1692 #
1693 # log_writer_read_output([$timeout])
1694 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1695 #     Passes $timeout to the select() call, with a default of 0.01.
1696 #     Returns the result of the last read() call on $log_pipe_out, or
1697 #     -1 if read() wasn't called because select() timed out.
1698 #     Only other log_writer_* functions should need to call this.
1699 #
1700 # log_writer_send($txt)
1701 #     Writes $txt to the output log collection.
1702 #
1703 # log_writer_finish()
1704 #     Closes the arv-put pipe and returns the output that it produces.
1705 #
1706 # log_writer_is_active()
1707 #     Returns a true value if there is currently a live arv-put
1708 #     process, false otherwise.
1709 #
1710 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1711     $log_pipe_pid);
1712
1713 sub log_writer_start($)
1714 {
1715   my $logfilename = shift;
1716   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1717                         'arv-put',
1718                         '--stream',
1719                         '--retries', '3',
1720                         '--filename', $logfilename,
1721                         '-');
1722   $log_pipe_out_buf = "";
1723   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1724 }
1725
1726 sub log_writer_read_output {
1727   my $timeout = shift || 0.01;
1728   my $read = -1;
1729   while ($read && $log_pipe_out_select->can_read($timeout)) {
1730     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1731                  length($log_pipe_out_buf));
1732   }
1733   if (!defined($read)) {
1734     Log(undef, "error reading log manifest from arv-put: $!");
1735   }
1736   return $read;
1737 }
1738
1739 sub log_writer_send($)
1740 {
1741   my $txt = shift;
1742   print $log_pipe_in $txt;
1743   log_writer_read_output();
1744 }
1745
1746 sub log_writer_finish()
1747 {
1748   return unless $log_pipe_pid;
1749
1750   close($log_pipe_in);
1751
1752   my $logger_failed = 0;
1753   my $read_result = log_writer_read_output(120);
1754   if ($read_result == -1) {
1755     $logger_failed = -1;
1756     Log (undef, "timed out reading from 'arv-put'");
1757   } elsif ($read_result != 0) {
1758     $logger_failed = -2;
1759     Log(undef, "failed to read arv-put log manifest to EOF");
1760   }
1761
1762   waitpid($log_pipe_pid, 0);
1763   if ($?) {
1764     $logger_failed ||= $?;
1765     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1766   }
1767
1768   close($log_pipe_out);
1769   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1770   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1771       $log_pipe_out_select = undef;
1772
1773   return $arv_put_output;
1774 }
1775
1776 sub log_writer_is_active() {
1777   return $log_pipe_pid;
1778 }
1779
1780 sub Log                         # ($jobstepidx, $logmessage)
1781 {
1782   my ($jobstepidx, $logmessage) = @_;
1783   if ($logmessage =~ /\n/) {
1784     for my $line (split (/\n/, $_[1])) {
1785       Log ($jobstepidx, $line);
1786     }
1787     return;
1788   }
1789   my $fh = select STDERR; $|=1; select $fh;
1790   my $task_qseq = '';
1791   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1792     $task_qseq = $jobstepidx;
1793   }
1794   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1795   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1796   $message .= "\n";
1797   my $datetime;
1798   if (log_writer_is_active() || -t STDERR) {
1799     my @gmtime = gmtime;
1800     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1801                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1802   }
1803   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1804
1805   if (log_writer_is_active()) {
1806     log_writer_send($datetime . " " . $message);
1807   }
1808 }
1809
1810
1811 sub croak
1812 {
1813   my ($package, $file, $line) = caller;
1814   my $message = "@_ at $file line $line\n";
1815   Log (undef, $message);
1816   freeze() if @jobstep_todo;
1817   create_output_collection() if @jobstep_todo;
1818   cleanup();
1819   save_meta();
1820   die;
1821 }
1822
1823
1824 sub cleanup
1825 {
1826   return unless $Job;
1827   if ($Job->{'state'} eq 'Cancelled') {
1828     $Job->update_attributes('finished_at' => scalar gmtime);
1829   } else {
1830     $Job->update_attributes('state' => 'Failed');
1831   }
1832 }
1833
1834
1835 sub save_meta
1836 {
1837   my $justcheckpoint = shift; # false if this will be the last meta saved
1838   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1839   return unless log_writer_is_active();
1840   my $log_manifest = log_writer_finish();
1841   return unless defined($log_manifest);
1842
1843   if ($Job->{log}) {
1844     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1845     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1846   }
1847
1848   my $log_coll = api_call(
1849     "collections/create", ensure_unique_name => 1, collection => {
1850       manifest_text => $log_manifest,
1851       owner_uuid => $Job->{owner_uuid},
1852       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1853     });
1854   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1855   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1856 }
1857
1858
1859 sub freeze_if_want_freeze
1860 {
1861   if ($main::please_freeze)
1862   {
1863     release_allocation();
1864     if (@_)
1865     {
1866       # kill some srun procs before freeze+stop
1867       map { $proc{$_} = {} } @_;
1868       while (%proc)
1869       {
1870         killem (keys %proc);
1871         select (undef, undef, undef, 0.1);
1872         my $died;
1873         while (($died = waitpid (-1, WNOHANG)) > 0)
1874         {
1875           delete $proc{$died};
1876         }
1877       }
1878     }
1879     freeze();
1880     create_output_collection();
1881     cleanup();
1882     save_meta();
1883     exit 1;
1884   }
1885 }
1886
1887
1888 sub freeze
1889 {
1890   Log (undef, "Freeze not implemented");
1891   return;
1892 }
1893
1894
1895 sub thaw
1896 {
1897   croak ("Thaw not implemented");
1898 }
1899
1900
1901 sub freezequote
1902 {
1903   my $s = shift;
1904   $s =~ s/\\/\\\\/g;
1905   $s =~ s/\n/\\n/g;
1906   return $s;
1907 }
1908
1909
1910 sub freezeunquote
1911 {
1912   my $s = shift;
1913   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1914   return $s;
1915 }
1916
1917
1918 sub srun_sync
1919 {
1920   my $srunargs = shift;
1921   my $execargs = shift;
1922   my $opts = shift || {};
1923   my $stdin = shift;
1924
1925   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1926   Log (undef, "$label: start");
1927
1928   my ($stderr_r, $stderr_w);
1929   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1930
1931   my ($stdout_r, $stdout_w);
1932   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1933
1934   my $srunpid = fork();
1935   if ($srunpid == 0)
1936   {
1937     close($stderr_r);
1938     close($stdout_r);
1939     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1940     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1941     open(STDERR, ">&", $stderr_w);
1942     open(STDOUT, ">&", $stdout_w);
1943     srun ($srunargs, $execargs, $opts, $stdin);
1944     exit (1);
1945   }
1946   close($stderr_w);
1947   close($stdout_w);
1948
1949   set_nonblocking($stderr_r);
1950   set_nonblocking($stdout_r);
1951
1952   # Add entries to @jobstep and %proc so check_squeue() and
1953   # freeze_if_want_freeze() can treat it like a job task process.
1954   push @jobstep, {
1955     stderr => '',
1956     stderr_at => 0,
1957     stderr_captured => '',
1958     stdout_r => $stdout_r,
1959     stdout_captured => '',
1960   };
1961   my $jobstepidx = $#jobstep;
1962   $proc{$srunpid} = {
1963     jobstepidx => $jobstepidx,
1964   };
1965   $reader{$jobstepidx} = $stderr_r;
1966
1967   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1968     my $busy = readfrompipes();
1969     if (!$busy || ($latest_refresh + 2 < scalar time)) {
1970       check_refresh_wanted();
1971       check_squeue();
1972     }
1973     if (!$busy) {
1974       select(undef, undef, undef, 0.1);
1975     }
1976     killem(keys %proc) if $main::please_freeze;
1977   }
1978   my $exited = $?;
1979
1980   1 while readfrompipes();
1981   process_stderr_final ($jobstepidx);
1982
1983   Log (undef, "$label: exit ".exit_status_s($exited));
1984
1985   close($stdout_r);
1986   close($stderr_r);
1987   delete $proc{$srunpid};
1988   delete $reader{$jobstepidx};
1989
1990   my $j = pop @jobstep;
1991   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1992 }
1993
1994
1995 sub srun
1996 {
1997   my $srunargs = shift;
1998   my $execargs = shift;
1999   my $opts = shift || {};
2000   my $stdin = shift;
2001   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2002
2003   $Data::Dumper::Terse = 1;
2004   $Data::Dumper::Indent = 0;
2005   my $show_cmd = Dumper($args);
2006   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2007   $show_cmd =~ s/\n/ /g;
2008   if ($opts->{fork}) {
2009     Log(undef, "starting: $show_cmd");
2010   } else {
2011     # This is a child process: parent is in charge of reading our
2012     # stderr and copying it to Log() if needed.
2013     warn "starting: $show_cmd\n";
2014   }
2015
2016   if (defined $stdin) {
2017     my $child = open STDIN, "-|";
2018     defined $child or die "no fork: $!";
2019     if ($child == 0) {
2020       print $stdin or die $!;
2021       close STDOUT or die $!;
2022       exit 0;
2023     }
2024   }
2025
2026   return system (@$args) if $opts->{fork};
2027
2028   exec @$args;
2029   warn "ENV size is ".length(join(" ",%ENV));
2030   die "exec failed: $!: @$args";
2031 }
2032
2033
2034 sub ban_node_by_slot {
2035   # Don't start any new jobsteps on this node for 60 seconds
2036   my $slotid = shift;
2037   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2038   $slot[$slotid]->{node}->{hold_count}++;
2039   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2040 }
2041
2042 sub must_lock_now
2043 {
2044   my ($lockfile, $error_message) = @_;
2045   open L, ">", $lockfile or croak("$lockfile: $!");
2046   if (!flock L, LOCK_EX|LOCK_NB) {
2047     croak("Can't lock $lockfile: $error_message\n");
2048   }
2049 }
2050
2051 sub find_docker_image {
2052   # Given a Keep locator, check to see if it contains a Docker image.
2053   # If so, return its stream name and Docker hash.
2054   # If not, return undef for both values.
2055   my $locator = shift;
2056   my ($streamname, $filename);
2057   my $image = api_call("collections/get", uuid => $locator);
2058   if ($image) {
2059     foreach my $line (split(/\n/, $image->{manifest_text})) {
2060       my @tokens = split(/\s+/, $line);
2061       next if (!@tokens);
2062       $streamname = shift(@tokens);
2063       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2064         if (defined($filename)) {
2065           return (undef, undef);  # More than one file in the Collection.
2066         } else {
2067           $filename = (split(/:/, $filedata, 3))[2];
2068         }
2069       }
2070     }
2071   }
2072   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2073     return ($streamname, $1);
2074   } else {
2075     return (undef, undef);
2076   }
2077 }
2078
2079 sub retry_count {
2080   # Calculate the number of times an operation should be retried,
2081   # assuming exponential backoff, and that we're willing to retry as
2082   # long as tasks have been running.  Enforce a minimum of 3 retries.
2083   my ($starttime, $endtime, $timediff, $retries);
2084   if (@jobstep) {
2085     $starttime = $jobstep[0]->{starttime};
2086     $endtime = $jobstep[-1]->{finishtime};
2087   }
2088   if (!defined($starttime)) {
2089     $timediff = 0;
2090   } elsif (!defined($endtime)) {
2091     $timediff = time - $starttime;
2092   } else {
2093     $timediff = ($endtime - $starttime) - (time - $endtime);
2094   }
2095   if ($timediff > 0) {
2096     $retries = int(log($timediff) / log(2));
2097   } else {
2098     $retries = 1;  # Use the minimum.
2099   }
2100   return ($retries > 3) ? $retries : 3;
2101 }
2102
2103 sub retry_op {
2104   # Pass in two function references.
2105   # This method will be called with the remaining arguments.
2106   # If it dies, retry it with exponential backoff until it succeeds,
2107   # or until the current retry_count is exhausted.  After each failure
2108   # that can be retried, the second function will be called with
2109   # the current try count (0-based), next try time, and error message.
2110   my $operation = shift;
2111   my $retry_callback = shift;
2112   my $retries = retry_count();
2113   foreach my $try_count (0..$retries) {
2114     my $next_try = time + (2 ** $try_count);
2115     my $result = eval { $operation->(@_); };
2116     if (!$@) {
2117       return $result;
2118     } elsif ($try_count < $retries) {
2119       $retry_callback->($try_count, $next_try, $@);
2120       my $sleep_time = $next_try - time;
2121       sleep($sleep_time) if ($sleep_time > 0);
2122     }
2123   }
2124   # Ensure the error message ends in a newline, so Perl doesn't add
2125   # retry_op's line number to it.
2126   chomp($@);
2127   die($@ . "\n");
2128 }
2129
2130 sub api_call {
2131   # Pass in a /-separated API method name, and arguments for it.
2132   # This function will call that method, retrying as needed until
2133   # the current retry_count is exhausted, with a log on the first failure.
2134   my $method_name = shift;
2135   my $log_api_retry = sub {
2136     my ($try_count, $next_try_at, $errmsg) = @_;
2137     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2138     $errmsg =~ s/\s/ /g;
2139     $errmsg =~ s/\s+$//;
2140     my $retry_msg;
2141     if ($next_try_at < time) {
2142       $retry_msg = "Retrying.";
2143     } else {
2144       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2145       $retry_msg = "Retrying at $next_try_fmt.";
2146     }
2147     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2148   };
2149   my $method = $arv;
2150   foreach my $key (split(/\//, $method_name)) {
2151     $method = $method->{$key};
2152   }
2153   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2154 }
2155
2156 sub exit_status_s {
2157   # Given a $?, return a human-readable exit code string like "0" or
2158   # "1" or "0 with signal 1" or "1 with signal 11".
2159   my $exitcode = shift;
2160   my $s = $exitcode >> 8;
2161   if ($exitcode & 0x7f) {
2162     $s .= " with signal " . ($exitcode & 0x7f);
2163   }
2164   if ($exitcode & 0x80) {
2165     $s .= " with core dump";
2166   }
2167   return $s;
2168 }
2169
2170 sub handle_readall {
2171   # Pass in a glob reference to a file handle.
2172   # Read all its contents and return them as a string.
2173   my $fh_glob_ref = shift;
2174   local $/ = undef;
2175   return <$fh_glob_ref>;
2176 }
2177
2178 sub tar_filename_n {
2179   my $n = shift;
2180   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2181 }
2182
2183 sub add_git_archive {
2184   # Pass in a git archive command as a string or list, a la system().
2185   # This method will save its output to be included in the archive sent to the
2186   # build script.
2187   my $git_input;
2188   $git_tar_count++;
2189   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2190     croak("Failed to save git archive: $!");
2191   }
2192   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2193   close($git_input);
2194   waitpid($git_pid, 0);
2195   close(GIT_ARCHIVE);
2196   if ($?) {
2197     croak("Failed to save git archive: git exited " . exit_status_s($?));
2198   }
2199 }
2200
2201 sub combined_git_archive {
2202   # Combine all saved tar archives into a single archive, then return its
2203   # contents in a string.  Return undef if no archives have been saved.
2204   if ($git_tar_count < 1) {
2205     return undef;
2206   }
2207   my $base_tar_name = tar_filename_n(1);
2208   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2209     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2210     if ($tar_exit != 0) {
2211       croak("Error preparing build archive: tar -A exited " .
2212             exit_status_s($tar_exit));
2213     }
2214   }
2215   if (!open(GIT_TAR, "<", $base_tar_name)) {
2216     croak("Could not open build archive: $!");
2217   }
2218   my $tar_contents = handle_readall(\*GIT_TAR);
2219   close(GIT_TAR);
2220   return $tar_contents;
2221 }
2222
2223 sub set_nonblocking {
2224   my $fh = shift;
2225   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2226   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2227 }
2228
2229 __DATA__
2230 #!/usr/bin/env perl
2231 #
2232 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2233 # server invokes this script on individual compute nodes, or localhost if we're
2234 # running a job locally.  It gets called in two modes:
2235 #
2236 # * No arguments: Installation mode.  Read a tar archive from the DATA
2237 #   file handle; it includes the Crunch script's source code, and
2238 #   maybe SDKs as well.  Those should be installed in the proper
2239 #   locations.  This runs outside of any Docker container, so don't try to
2240 #   introspect Crunch's runtime environment.
2241 #
2242 # * With arguments: Crunch script run mode.  This script should set up the
2243 #   environment, then run the command specified in the arguments.  This runs
2244 #   inside any Docker container.
2245
2246 use Fcntl ':flock';
2247 use File::Path qw( make_path remove_tree );
2248 use POSIX qw(getcwd);
2249
2250 use constant TASK_TEMPFAIL => 111;
2251
2252 # Map SDK subdirectories to the path environments they belong to.
2253 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2254
2255 my $destdir = $ENV{"CRUNCH_SRC"};
2256 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2257 my $repo = $ENV{"CRUNCH_SRC_URL"};
2258 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2259 my $job_work = $ENV{"JOB_WORK"};
2260 my $task_work = $ENV{"TASK_WORK"};
2261
2262 open(STDOUT_ORIG, ">&", STDOUT);
2263 open(STDERR_ORIG, ">&", STDERR);
2264
2265 for my $dir ($destdir, $job_work, $task_work) {
2266   if ($dir) {
2267     make_path $dir;
2268     -e $dir or die "Failed to create temporary directory ($dir): $!";
2269   }
2270 }
2271
2272 if ($task_work) {
2273   remove_tree($task_work, {keep_root => 1});
2274 }
2275
2276 ### Crunch script run mode
2277 if (@ARGV) {
2278   # We want to do routine logging during task 0 only.  This gives the user
2279   # the information they need, but avoids repeating the information for every
2280   # task.
2281   my $Log;
2282   if ($ENV{TASK_SEQUENCE} eq "0") {
2283     $Log = sub {
2284       my $msg = shift;
2285       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2286     };
2287   } else {
2288     $Log = sub { };
2289   }
2290
2291   my $python_src = "$install_dir/python";
2292   my $venv_dir = "$job_work/.arvados.venv";
2293   my $venv_built = -e "$venv_dir/bin/activate";
2294   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2295     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2296                  "--python=python2.7", $venv_dir);
2297     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2298     $venv_built = 1;
2299     $Log->("Built Python SDK virtualenv");
2300   }
2301
2302   my @pysdk_version_cmd = ("python", "-c",
2303     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2304   if ($venv_built) {
2305     $Log->("Running in Python SDK virtualenv");
2306     @pysdk_version_cmd = ();
2307     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2308     @ARGV = ("/bin/sh", "-ec",
2309              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2310   } elsif (-d $python_src) {
2311     $Log->("Warning: virtualenv not found inside Docker container default " .
2312            "\$PATH. Can't install Python SDK.");
2313   }
2314
2315   if (@pysdk_version_cmd) {
2316     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2317     my $pysdk_version = <$pysdk_version_pipe>;
2318     close($pysdk_version_pipe);
2319     if ($? == 0) {
2320       chomp($pysdk_version);
2321       $Log->("Using Arvados SDK version $pysdk_version");
2322     } else {
2323       # A lot could've gone wrong here, but pretty much all of it means that
2324       # Python won't be able to load the Arvados SDK.
2325       $Log->("Warning: Arvados SDK not found");
2326     }
2327   }
2328
2329   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2330     my $sdk_path = "$install_dir/$sdk_dir";
2331     if (-d $sdk_path) {
2332       if ($ENV{$sdk_envkey}) {
2333         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2334       } else {
2335         $ENV{$sdk_envkey} = $sdk_path;
2336       }
2337       $Log->("Arvados SDK added to %s", $sdk_envkey);
2338     }
2339   }
2340
2341   exec(@ARGV);
2342   die "Cannot exec `@ARGV`: $!";
2343 }
2344
2345 ### Installation mode
2346 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2347 flock L, LOCK_EX;
2348 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2349   # This exact git archive (source + arvados sdk) is already installed
2350   # here, so there's no need to reinstall it.
2351
2352   # We must consume our DATA section, though: otherwise the process
2353   # feeding it to us will get SIGPIPE.
2354   my $buf;
2355   while (read(DATA, $buf, 65536)) { }
2356
2357   exit(0);
2358 }
2359
2360 unlink "$destdir.archive_hash";
2361 mkdir $destdir;
2362
2363 do {
2364   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2365   local $SIG{PIPE} = "IGNORE";
2366   warn "Extracting archive: $archive_hash\n";
2367   # --ignore-zeros is necessary sometimes: depending on how much NUL
2368   # padding tar -A put on our combined archive (which in turn depends
2369   # on the length of the component archives) tar without
2370   # --ignore-zeros will exit before consuming stdin and cause close()
2371   # to fail on the resulting SIGPIPE.
2372   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2373     die "Error launching 'tar -xC $destdir': $!";
2374   }
2375   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2376   # get SIGPIPE.  We must feed it data incrementally.
2377   my $tar_input;
2378   while (read(DATA, $tar_input, 65536)) {
2379     print TARX $tar_input;
2380   }
2381   if(!close(TARX)) {
2382     die "'tar -xC $destdir' exited $?: $!";
2383   }
2384 };
2385
2386 mkdir $install_dir;
2387
2388 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2389 if (-d $sdk_root) {
2390   foreach my $sdk_lang (("python",
2391                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2392     if (-d "$sdk_root/$sdk_lang") {
2393       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2394         die "Failed to install $sdk_lang SDK: $!";
2395       }
2396     }
2397   }
2398 }
2399
2400 my $python_dir = "$install_dir/python";
2401 if ((-d $python_dir) and can_run("python2.7")) {
2402   open(my $egg_info_pipe, "-|",
2403        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2404   my @egg_info_errors = <$egg_info_pipe>;
2405   close($egg_info_pipe);
2406
2407   if ($?) {
2408     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2409       # egg_info apparently failed because it couldn't ask git for a build tag.
2410       # Specify no build tag.
2411       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2412       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2413       close($pysdk_cfg);
2414     } else {
2415       my $egg_info_exit = $? >> 8;
2416       foreach my $errline (@egg_info_errors) {
2417         warn $errline;
2418       }
2419       warn "python setup.py egg_info failed: exit $egg_info_exit";
2420       exit ($egg_info_exit || 1);
2421     }
2422   }
2423 }
2424
2425 # Hide messages from the install script (unless it fails: shell_or_die
2426 # will show $destdir.log in that case).
2427 open(STDOUT, ">>", "$destdir.log");
2428 open(STDERR, ">&", STDOUT);
2429
2430 if (-e "$destdir/crunch_scripts/install") {
2431     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2432 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2433     # Old version
2434     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2435 } elsif (-e "./install.sh") {
2436     shell_or_die (undef, "./install.sh", $install_dir);
2437 }
2438
2439 if ($archive_hash) {
2440     unlink "$destdir.archive_hash.new";
2441     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2442     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2443 }
2444
2445 close L;
2446
2447 sub can_run {
2448   my $command_name = shift;
2449   open(my $which, "-|", "which", $command_name);
2450   while (<$which>) { }
2451   close($which);
2452   return ($? == 0);
2453 }
2454
2455 sub shell_or_die
2456 {
2457   my $exitcode = shift;
2458
2459   if ($ENV{"DEBUG"}) {
2460     print STDERR "@_\n";
2461   }
2462   if (system (@_) != 0) {
2463     my $err = $!;
2464     my $code = $?;
2465     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2466     open STDERR, ">&STDERR_ORIG";
2467     system ("cat $destdir.log >&2");
2468     warn "@_ failed ($err): $exitstatus";
2469     if (defined($exitcode)) {
2470       exit $exitcode;
2471     }
2472     else {
2473       exit (($code >> 8) || 1);
2474     }
2475   }
2476 }
2477
2478 __DATA__