Merge branch '8343-cgroup-root' closes #8343
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $cgroup_root = "/sys/fs/cgroup";
130 my $docker_bin = "docker.io";
131 my $docker_run_args = "";
132 GetOptions('force-unlock' => \$force_unlock,
133            'git-dir=s' => \$git_dir,
134            'job=s' => \$jobspec,
135            'job-api-token=s' => \$job_api_token,
136            'no-clear-tmp' => \$no_clear_tmp,
137            'resume-stash=s' => \$resume_stash,
138            'cgroup-root=s' => \$cgroup_root,
139            'docker-bin=s' => \$docker_bin,
140            'docker-run-args=s' => \$docker_run_args,
141     );
142
143 if (defined $job_api_token) {
144   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
145 }
146
147 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
148
149
150 $SIG{'USR1'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 1;
153 };
154 $SIG{'USR2'} = sub
155 {
156   $main::ENV{CRUNCH_DEBUG} = 0;
157 };
158
159 my $arv = Arvados->new('apiVersion' => 'v1');
160
161 my $Job;
162 my $job_id;
163 my $dbh;
164 my $sth;
165 my @jobstep;
166
167 my $local_job;
168 if ($jobspec =~ /^[-a-z\d]+$/)
169 {
170   # $jobspec is an Arvados UUID, not a JSON job specification
171   $Job = api_call("jobs/get", uuid => $jobspec);
172   $local_job = 0;
173 }
174 else
175 {
176   $local_job = JSON::decode_json($jobspec);
177 }
178
179
180 # Make sure our workers (our slurm nodes, localhost, or whatever) are
181 # at least able to run basic commands: they aren't down or severely
182 # misconfigured.
183 my $cmd = ['true'];
184 if (($Job || $local_job)->{docker_image_locator}) {
185   $cmd = [$docker_bin, 'ps', '-q'];
186 }
187 Log(undef, "Sanity check is `@$cmd`");
188 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
189      $cmd,
190      {fork => 1});
191 if ($? != 0) {
192   Log(undef, "Sanity check failed: ".exit_status_s($?));
193   exit EX_TEMPFAIL;
194 }
195 Log(undef, "Sanity check OK");
196
197
198 my $User = api_call("users/current");
199
200 if (!$local_job) {
201   if (!$force_unlock) {
202     # Claim this job, and make sure nobody else does
203     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
204     if ($@) {
205       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
206       exit EX_TEMPFAIL;
207     };
208   }
209 }
210 else
211 {
212   if (!$resume_stash)
213   {
214     map { croak ("No $_ specified") unless $local_job->{$_} }
215     qw(script script_version script_parameters);
216   }
217
218   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
219   $local_job->{'started_at'} = gmtime;
220   $local_job->{'state'} = 'Running';
221
222   $Job = api_call("jobs/create", job => $local_job);
223 }
224 $job_id = $Job->{'uuid'};
225
226 my $keep_logfile = $job_id . '.log.txt';
227 log_writer_start($keep_logfile);
228
229 $Job->{'runtime_constraints'} ||= {};
230 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
231 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
232
233 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
234 if ($? == 0) {
235   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
236   chomp($gem_versions);
237   chop($gem_versions);  # Closing parentheses
238 } else {
239   $gem_versions = "";
240 }
241 Log(undef,
242     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
243
244 Log (undef, "check slurm allocation");
245 my @slot;
246 my @node;
247 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
248 my @sinfo;
249 if (!$have_slurm)
250 {
251   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
252   push @sinfo, "$localcpus localhost";
253 }
254 if (exists $ENV{SLURM_NODELIST})
255 {
256   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
257 }
258 foreach (@sinfo)
259 {
260   my ($ncpus, $slurm_nodelist) = split;
261   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
262
263   my @nodelist;
264   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
265   {
266     my $nodelist = $1;
267     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
268     {
269       my $ranges = $1;
270       foreach (split (",", $ranges))
271       {
272         my ($a, $b);
273         if (/(\d+)-(\d+)/)
274         {
275           $a = $1;
276           $b = $2;
277         }
278         else
279         {
280           $a = $_;
281           $b = $_;
282         }
283         push @nodelist, map {
284           my $n = $nodelist;
285           $n =~ s/\[[-,\d]+\]/$_/;
286           $n;
287         } ($a..$b);
288       }
289     }
290     else
291     {
292       push @nodelist, $nodelist;
293     }
294   }
295   foreach my $nodename (@nodelist)
296   {
297     Log (undef, "node $nodename - $ncpus slots");
298     my $node = { name => $nodename,
299                  ncpus => $ncpus,
300                  # The number of consecutive times a task has been dispatched
301                  # to this node and failed.
302                  losing_streak => 0,
303                  # The number of consecutive times that SLURM has reported
304                  # a node failure since the last successful task.
305                  fail_count => 0,
306                  # Don't dispatch work to this node until this time
307                  # (in seconds since the epoch) has passed.
308                  hold_until => 0 };
309     foreach my $cpu (1..$ncpus)
310     {
311       push @slot, { node => $node,
312                     cpu => $cpu };
313     }
314   }
315   push @node, @nodelist;
316 }
317
318
319
320 # Ensure that we get one jobstep running on each allocated node before
321 # we start overloading nodes with concurrent steps
322
323 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
324
325
326 $Job->update_attributes(
327   'tasks_summary' => { 'failed' => 0,
328                        'todo' => 1,
329                        'running' => 0,
330                        'done' => 0 });
331
332 Log (undef, "start");
333 $SIG{'INT'} = sub { $main::please_freeze = 1; };
334 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
335 $SIG{'TERM'} = \&croak;
336 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
337 $SIG{'ALRM'} = sub { $main::please_info = 1; };
338 $SIG{'CONT'} = sub { $main::please_continue = 1; };
339 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
340
341 $main::please_freeze = 0;
342 $main::please_info = 0;
343 $main::please_continue = 0;
344 $main::please_refresh = 0;
345 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
346
347 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
348 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
349 $ENV{"JOB_UUID"} = $job_id;
350
351
352 my @jobstep_todo = ();
353 my @jobstep_done = ();
354 my @jobstep_tomerge = ();
355 my $jobstep_tomerge_level = 0;
356 my $squeue_checked = 0;
357 my $latest_refresh = scalar time;
358
359
360
361 if (defined $Job->{thawedfromkey})
362 {
363   thaw ($Job->{thawedfromkey});
364 }
365 else
366 {
367   my $first_task = api_call("job_tasks/create", job_task => {
368     'job_uuid' => $Job->{'uuid'},
369     'sequence' => 0,
370     'qsequence' => 0,
371     'parameters' => {},
372   });
373   push @jobstep, { 'level' => 0,
374                    'failures' => 0,
375                    'arvados_task' => $first_task,
376                  };
377   push @jobstep_todo, 0;
378 }
379
380
381 if (!$have_slurm)
382 {
383   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
384 }
385
386 my $build_script = handle_readall(\*DATA);
387 my $nodelist = join(",", @node);
388 my $git_tar_count = 0;
389
390 if (!defined $no_clear_tmp) {
391   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392   Log (undef, "Clean work dirs");
393
394   my $cleanpid = fork();
395   if ($cleanpid == 0)
396   {
397     # Find FUSE mounts under $CRUNCH_TMP and unmount them.
398     # Then clean up work directories.
399     # TODO: When #5036 is done and widely deployed, we can limit mount's
400     # -t option to simply fuse.keep.
401     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
402           ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
403     exit (1);
404   }
405   while (1)
406   {
407     last if $cleanpid == waitpid (-1, WNOHANG);
408     freeze_if_want_freeze ($cleanpid);
409     select (undef, undef, undef, 0.1);
410   }
411   if ($?) {
412     Log(undef, "Clean work dirs: exit ".exit_status_s($?));
413     exit(EX_RETRY_UNLOCKED);
414   }
415 }
416
417 # If this job requires a Docker image, install that.
418 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
419 if ($docker_locator = $Job->{docker_image_locator}) {
420   Log (undef, "Install docker image $docker_locator");
421   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
422   if (!$docker_hash)
423   {
424     croak("No Docker image hash found from locator $docker_locator");
425   }
426   Log (undef, "docker image hash is $docker_hash");
427   $docker_stream =~ s/^\.//;
428   my $docker_install_script = qq{
429 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
430     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
431 fi
432 };
433   my $docker_pid = fork();
434   if ($docker_pid == 0)
435   {
436     srun (["srun", "--nodelist=" . join(',', @node)],
437           ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]);
438     exit ($?);
439   }
440   while (1)
441   {
442     last if $docker_pid == waitpid (-1, WNOHANG);
443     freeze_if_want_freeze ($docker_pid);
444     select (undef, undef, undef, 0.1);
445   }
446   if ($? != 0)
447   {
448     Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?));
449     exit(EX_RETRY_UNLOCKED);
450   }
451
452   # Determine whether this version of Docker supports memory+swap limits.
453   srun(["srun", "--nodelist=" . $node[0]],
454        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
455       {fork => 1});
456   $docker_limitmem = ($? == 0);
457
458   # Find a non-root Docker user to use.
459   # Tries the default user for the container, then 'crunch', then 'nobody',
460   # testing for whether the actual user id is non-zero.  This defends against
461   # mistakes but not malice, but we intend to harden the security in the future
462   # so we don't want anyone getting used to their jobs running as root in their
463   # Docker containers.
464   my @tryusers = ("", "crunch", "nobody");
465   foreach my $try_user (@tryusers) {
466     my $try_user_arg;
467     if ($try_user eq "") {
468       Log(undef, "Checking if container default user is not UID 0");
469       $try_user_arg = "";
470     } else {
471       Log(undef, "Checking if user '$try_user' is not UID 0");
472       $try_user_arg = "--user=$try_user";
473     }
474     srun(["srun", "--nodelist=" . $node[0]],
475          ["/bin/sh", "-ec",
476           "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
477           " test \$a -ne 0"],
478          {fork => 1});
479     if ($? == 0) {
480       $dockeruserarg = $try_user_arg;
481       if ($try_user eq "") {
482         Log(undef, "Container will run with default user");
483       } else {
484         Log(undef, "Container will run with $dockeruserarg");
485       }
486       last;
487     }
488   }
489
490   if (!defined $dockeruserarg) {
491     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
492   }
493
494   if ($Job->{arvados_sdk_version}) {
495     # The job also specifies an Arvados SDK version.  Add the SDKs to the
496     # tar file for the build script to install.
497     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
498                        $Job->{arvados_sdk_version}));
499     add_git_archive("git", "--git-dir=$git_dir", "archive",
500                     "--prefix=.arvados.sdk/",
501                     $Job->{arvados_sdk_version}, "sdk");
502   }
503 }
504
505 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
506   # If script_version looks like an absolute path, *and* the --git-dir
507   # argument was not given -- which implies we were not invoked by
508   # crunch-dispatch -- we will use the given path as a working
509   # directory instead of resolving script_version to a git commit (or
510   # doing anything else with git).
511   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
512   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
513 }
514 else {
515   # Resolve the given script_version to a git commit sha1. Also, if
516   # the repository is remote, clone it into our local filesystem: this
517   # ensures "git archive" will work, and is necessary to reliably
518   # resolve a symbolic script_version like "master^".
519   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
520
521   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
522
523   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
524
525   # If we're running under crunch-dispatch, it will have already
526   # pulled the appropriate source tree into its own repository, and
527   # given us that repo's path as $git_dir.
528   #
529   # If we're running a "local" job, we might have to fetch content
530   # from a remote repository.
531   #
532   # (Currently crunch-dispatch gives a local path with --git-dir, but
533   # we might as well accept URLs there too in case it changes its
534   # mind.)
535   my $repo = $git_dir || $Job->{'repository'};
536
537   # Repository can be remote or local. If remote, we'll need to fetch it
538   # to a local dir before doing `git log` et al.
539   my $repo_location;
540
541   if ($repo =~ m{://|^[^/]*:}) {
542     # $repo is a git url we can clone, like git:// or https:// or
543     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
544     # not recognized here because distinguishing that from a local
545     # path is too fragile. If you really need something strange here,
546     # use the ssh:// form.
547     $repo_location = 'remote';
548   } elsif ($repo =~ m{^\.*/}) {
549     # $repo is a local path to a git index. We'll also resolve ../foo
550     # to ../foo/.git if the latter is a directory. To help
551     # disambiguate local paths from named hosted repositories, this
552     # form must be given as ./ or ../ if it's a relative path.
553     if (-d "$repo/.git") {
554       $repo = "$repo/.git";
555     }
556     $repo_location = 'local';
557   } else {
558     # $repo is none of the above. It must be the name of a hosted
559     # repository.
560     my $arv_repo_list = api_call("repositories/list",
561                                  'filters' => [['name','=',$repo]]);
562     my @repos_found = @{$arv_repo_list->{'items'}};
563     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
564     if ($n_found > 0) {
565       Log(undef, "Repository '$repo' -> "
566           . join(", ", map { $_->{'uuid'} } @repos_found));
567     }
568     if ($n_found != 1) {
569       croak("Error: Found $n_found repositories with name '$repo'.");
570     }
571     $repo = $repos_found[0]->{'fetch_url'};
572     $repo_location = 'remote';
573   }
574   Log(undef, "Using $repo_location repository '$repo'");
575   $ENV{"CRUNCH_SRC_URL"} = $repo;
576
577   # Resolve given script_version (we'll call that $treeish here) to a
578   # commit sha1 ($commit).
579   my $treeish = $Job->{'script_version'};
580   my $commit;
581   if ($repo_location eq 'remote') {
582     # We minimize excess object-fetching by re-using the same bare
583     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
584     # just keep adding remotes to it as needed.
585     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
586     my $gitcmd = "git --git-dir=\Q$local_repo\E";
587
588     # Set up our local repo for caching remote objects, making
589     # archives, etc.
590     if (!-d $local_repo) {
591       make_path($local_repo) or croak("Error: could not create $local_repo");
592     }
593     # This works (exits 0 and doesn't delete fetched objects) even
594     # if $local_repo is already initialized:
595     `$gitcmd init --bare`;
596     if ($?) {
597       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
598     }
599
600     # If $treeish looks like a hash (or abbrev hash) we look it up in
601     # our local cache first, since that's cheaper. (We don't want to
602     # do that with tags/branches though -- those change over time, so
603     # they should always be resolved by the remote repo.)
604     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
605       # Hide stderr because it's normal for this to fail:
606       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
607       if ($? == 0 &&
608           # Careful not to resolve a branch named abcdeff to commit 1234567:
609           $sha1 =~ /^$treeish/ &&
610           $sha1 =~ /^([0-9a-f]{40})$/s) {
611         $commit = $1;
612         Log(undef, "Commit $commit already present in $local_repo");
613       }
614     }
615
616     if (!defined $commit) {
617       # If $treeish isn't just a hash or abbrev hash, or isn't here
618       # yet, we need to fetch the remote to resolve it correctly.
619
620       # First, remove all local heads. This prevents a name that does
621       # not exist on the remote from resolving to (or colliding with)
622       # a previously fetched branch or tag (possibly from a different
623       # remote).
624       remove_tree("$local_repo/refs/heads", {keep_root => 1});
625
626       Log(undef, "Fetching objects from $repo to $local_repo");
627       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
628       if ($?) {
629         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
630       }
631     }
632
633     # Now that the data is all here, we will use our local repo for
634     # the rest of our git activities.
635     $repo = $local_repo;
636   }
637
638   my $gitcmd = "git --git-dir=\Q$repo\E";
639   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
640   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
641     croak("`$gitcmd rev-list` exited "
642           .exit_status_s($?)
643           .", '$treeish' not found, giving up");
644   }
645   $commit = $1;
646   Log(undef, "Version $treeish is commit $commit");
647
648   if ($commit ne $Job->{'script_version'}) {
649     # Record the real commit id in the database, frozentokey, logs,
650     # etc. -- instead of an abbreviation or a branch name which can
651     # become ambiguous or point to a different commit in the future.
652     if (!$Job->update_attributes('script_version' => $commit)) {
653       croak("Error: failed to update job's script_version attribute");
654     }
655   }
656
657   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
658   add_git_archive("$gitcmd archive ''\Q$commit\E");
659 }
660
661 my $git_archive = combined_git_archive();
662 if (!defined $git_archive) {
663   Log(undef, "Skip install phase (no git archive)");
664   if ($have_slurm) {
665     Log(undef, "Warning: This probably means workers have no source tree!");
666   }
667 }
668 else {
669   my $install_exited;
670   my $install_script_tries_left = 3;
671   for (my $attempts = 0; $attempts < 3; $attempts++) {
672     Log(undef, "Run install script on all workers");
673
674     my @srunargs = ("srun",
675                     "--nodelist=$nodelist",
676                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
677     my @execargs = ("sh", "-c",
678                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
679
680     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
681     my ($install_stderr_r, $install_stderr_w);
682     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
683     set_nonblocking($install_stderr_r);
684     my $installpid = fork();
685     if ($installpid == 0)
686     {
687       close($install_stderr_r);
688       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
689       open(STDOUT, ">&", $install_stderr_w);
690       open(STDERR, ">&", $install_stderr_w);
691       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
692       exit (1);
693     }
694     close($install_stderr_w);
695     # Tell freeze_if_want_freeze how to kill the child, otherwise the
696     # "waitpid(installpid)" loop won't get interrupted by a freeze:
697     $proc{$installpid} = {};
698     my $stderr_buf = '';
699     # Track whether anything appears on stderr other than slurm errors
700     # ("srun: ...") and the "starting: ..." message printed by the
701     # srun subroutine itself:
702     my $stderr_anything_from_script = 0;
703     my $match_our_own_errors = '^(srun: error: |starting: \[)';
704     while ($installpid != waitpid(-1, WNOHANG)) {
705       freeze_if_want_freeze ($installpid);
706       # Wait up to 0.1 seconds for something to appear on stderr, then
707       # do a non-blocking read.
708       my $bits = fhbits($install_stderr_r);
709       select ($bits, undef, $bits, 0.1);
710       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
711       {
712         while ($stderr_buf =~ /^(.*?)\n/) {
713           my $line = $1;
714           substr $stderr_buf, 0, 1+length($line), "";
715           Log(undef, "stderr $line");
716           if ($line !~ /$match_our_own_errors/) {
717             $stderr_anything_from_script = 1;
718           }
719         }
720       }
721     }
722     delete $proc{$installpid};
723     $install_exited = $?;
724     close($install_stderr_r);
725     if (length($stderr_buf) > 0) {
726       if ($stderr_buf !~ /$match_our_own_errors/) {
727         $stderr_anything_from_script = 1;
728       }
729       Log(undef, "stderr $stderr_buf")
730     }
731
732     Log (undef, "Install script exited ".exit_status_s($install_exited));
733     last if $install_exited == 0 || $main::please_freeze;
734     # If the install script fails but doesn't print an error message,
735     # the next thing anyone is likely to do is just run it again in
736     # case it was a transient problem like "slurm communication fails
737     # because the network isn't reliable enough". So we'll just do
738     # that ourselves (up to 3 attempts in total). OTOH, if there is an
739     # error message, the problem is more likely to have a real fix and
740     # we should fail the job so the fixing process can start, instead
741     # of doing 2 more attempts.
742     last if $stderr_anything_from_script;
743   }
744
745   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
746     unlink($tar_filename);
747   }
748
749   if ($install_exited != 0) {
750     croak("Giving up");
751   }
752 }
753
754 foreach (qw (script script_version script_parameters runtime_constraints))
755 {
756   Log (undef,
757        "$_ " .
758        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
759 }
760 foreach (split (/\n/, $Job->{knobs}))
761 {
762   Log (undef, "knob " . $_);
763 }
764
765
766
767 $main::success = undef;
768
769
770
771 ONELEVEL:
772
773 my $thisround_succeeded = 0;
774 my $thisround_failed = 0;
775 my $thisround_failed_multiple = 0;
776 my $working_slot_count = scalar(@slot);
777
778 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
779                        or $a <=> $b } @jobstep_todo;
780 my $level = $jobstep[$jobstep_todo[0]]->{level};
781
782 my $initial_tasks_this_level = 0;
783 foreach my $id (@jobstep_todo) {
784   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
785 }
786
787 # If the number of tasks scheduled at this level #T is smaller than the number
788 # of slots available #S, only use the first #T slots, or the first slot on
789 # each node, whichever number is greater.
790 #
791 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
792 # based on these numbers.  Using fewer slots makes more resources available
793 # to each individual task, which should normally be a better strategy when
794 # there are fewer of them running with less parallelism.
795 #
796 # Note that this calculation is not redone if the initial tasks at
797 # this level queue more tasks at the same level.  This may harm
798 # overall task throughput for that level.
799 my @freeslot;
800 if ($initial_tasks_this_level < @node) {
801   @freeslot = (0..$#node);
802 } elsif ($initial_tasks_this_level < @slot) {
803   @freeslot = (0..$initial_tasks_this_level - 1);
804 } else {
805   @freeslot = (0..$#slot);
806 }
807 my $round_num_freeslots = scalar(@freeslot);
808
809 my %round_max_slots = ();
810 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
811   my $this_slot = $slot[$freeslot[$ii]];
812   my $node_name = $this_slot->{node}->{name};
813   $round_max_slots{$node_name} ||= $this_slot->{cpu};
814   last if (scalar(keys(%round_max_slots)) >= @node);
815 }
816
817 Log(undef, "start level $level with $round_num_freeslots slots");
818 my @holdslot;
819 my %reader;
820 my $progress_is_dirty = 1;
821 my $progress_stats_updated = 0;
822
823 update_progress_stats();
824
825
826 THISROUND:
827 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
828 {
829   # Don't create new tasks if we already know the job's final result.
830   last if defined($main::success);
831
832   my $id = $jobstep_todo[$todo_ptr];
833   my $Jobstep = $jobstep[$id];
834   if ($Jobstep->{level} != $level)
835   {
836     next;
837   }
838
839   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
840   set_nonblocking($reader{$id});
841
842   my $childslot = $freeslot[0];
843   my $childnode = $slot[$childslot]->{node};
844   my $childslotname = join (".",
845                             $slot[$childslot]->{node}->{name},
846                             $slot[$childslot]->{cpu});
847
848   my $childpid = fork();
849   if ($childpid == 0)
850   {
851     $SIG{'INT'} = 'DEFAULT';
852     $SIG{'QUIT'} = 'DEFAULT';
853     $SIG{'TERM'} = 'DEFAULT';
854
855     foreach (values (%reader))
856     {
857       close($_);
858     }
859     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
860     open(STDOUT,">&writer");
861     open(STDERR,">&writer");
862
863     undef $dbh;
864     undef $sth;
865
866     delete $ENV{"GNUPGHOME"};
867     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
868     $ENV{"TASK_QSEQUENCE"} = $id;
869     $ENV{"TASK_SEQUENCE"} = $level;
870     $ENV{"JOB_SCRIPT"} = $Job->{script};
871     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
872       $param =~ tr/a-z/A-Z/;
873       $ENV{"JOB_PARAMETER_$param"} = $value;
874     }
875     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
876     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
877     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
878     $ENV{"HOME"} = $ENV{"TASK_WORK"};
879     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
880     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
881     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
882
883     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
884
885     $ENV{"GZIP"} = "-n";
886
887     my @srunargs = (
888       "srun",
889       "--nodelist=".$childnode->{name},
890       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
891       "--job-name=$job_id.$id.$$",
892         );
893
894     my $stdbuf = " stdbuf --output=0 --error=0 ";
895
896     my $arv_file_cache = "";
897     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
898       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
899     }
900
901     my $command =
902         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
903         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
904         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
905         # These environment variables get used explicitly later in
906         # $command.  No tool is expected to read these values directly.
907         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
908         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
909         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
910         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
911
912     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
913     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
914     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
915
916     if ($docker_hash)
917     {
918       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
919       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
920       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
921       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
922       # We only set memory limits if Docker lets us limit both memory and swap.
923       # Memory limits alone have been supported longer, but subprocesses tend
924       # to get SIGKILL if they exceed that without any swap limit set.
925       # See #5642 for additional background.
926       if ($docker_limitmem) {
927         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
928       }
929
930       # The source tree and $destdir directory (which we have
931       # installed on the worker host) are available in the container,
932       # under the same path.
933       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
934       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
935
936       # Currently, we make the "by_pdh" directory in arv-mount's mount
937       # point appear at /keep inside the container (instead of using
938       # the same path as the host like we do with CRUNCH_SRC and
939       # CRUNCH_INSTALL). However, crunch scripts and utilities must
940       # not rely on this. They must use $TASK_KEEPMOUNT.
941       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
942       $ENV{TASK_KEEPMOUNT} = "/keep";
943
944       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
945       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
946       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
947
948       # TASK_WORK is almost exactly like a docker data volume: it
949       # starts out empty, is writable, and persists until no
950       # containers use it any more. We don't use --volumes-from to
951       # share it with other containers: it is only accessible to this
952       # task, and it goes away when this task stops.
953       #
954       # However, a docker data volume is writable only by root unless
955       # the mount point already happens to exist in the container with
956       # different permissions. Therefore, we [1] assume /tmp already
957       # exists in the image and is writable by the crunch user; [2]
958       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
959       # writable if they are created by docker while setting up the
960       # other --volumes); and [3] create $TASK_WORK inside the
961       # container using $build_script.
962       $command .= "--volume=/tmp ";
963       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
964       $ENV{"HOME"} = $ENV{"TASK_WORK"};
965       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
966
967       # TODO: Share a single JOB_WORK volume across all task
968       # containers on a given worker node, and delete it when the job
969       # ends (and, in case that doesn't work, when the next job
970       # starts).
971       #
972       # For now, use the same approach as TASK_WORK above.
973       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
974
975       while (my ($env_key, $env_val) = each %ENV)
976       {
977         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
978           $command .= "--env=\Q$env_key=$env_val\E ";
979         }
980       }
981       $command .= "--env=\QHOME=$ENV{HOME}\E ";
982       $command .= "\Q$docker_hash\E ";
983
984       if ($Job->{arvados_sdk_version}) {
985         $command .= $stdbuf;
986         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
987       } else {
988         $command .= "/bin/sh -c \'python -c " .
989             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
990             ">&2 2>/dev/null; " .
991             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
992             "if which stdbuf >/dev/null ; then " .
993             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
994             " else " .
995             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
996             " fi\'";
997       }
998     } else {
999       # Non-docker run
1000       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
1001       $command .= $stdbuf;
1002       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
1003     }
1004
1005     my @execargs = ('bash', '-c', $command);
1006     srun (\@srunargs, \@execargs, undef, $build_script);
1007     # exec() failed, we assume nothing happened.
1008     die "srun() failed on build script\n";
1009   }
1010   close("writer");
1011   if (!defined $childpid)
1012   {
1013     close $reader{$id};
1014     delete $reader{$id};
1015     next;
1016   }
1017   shift @freeslot;
1018   $proc{$childpid} = { jobstep => $id,
1019                        time => time,
1020                        slot => $childslot,
1021                        jobstepname => "$job_id.$id.$childpid",
1022                      };
1023   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1024   $slot[$childslot]->{pid} = $childpid;
1025
1026   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1027   Log ($id, "child $childpid started on $childslotname");
1028   $Jobstep->{starttime} = time;
1029   $Jobstep->{node} = $childnode->{name};
1030   $Jobstep->{slotindex} = $childslot;
1031   delete $Jobstep->{stderr};
1032   delete $Jobstep->{finishtime};
1033   delete $Jobstep->{tempfail};
1034
1035   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1036   $Jobstep->{'arvados_task'}->save;
1037
1038   splice @jobstep_todo, $todo_ptr, 1;
1039   --$todo_ptr;
1040
1041   $progress_is_dirty = 1;
1042
1043   while (!@freeslot
1044          ||
1045          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1046   {
1047     last THISROUND if $main::please_freeze;
1048     if ($main::please_info)
1049     {
1050       $main::please_info = 0;
1051       freeze();
1052       create_output_collection();
1053       save_meta(1);
1054       update_progress_stats();
1055     }
1056     my $gotsome
1057         = readfrompipes ()
1058         + reapchildren ();
1059     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1060     {
1061       check_refresh_wanted();
1062       check_squeue();
1063       update_progress_stats();
1064     }
1065     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1066     {
1067       update_progress_stats();
1068     }
1069     if (!$gotsome) {
1070       select (undef, undef, undef, 0.1);
1071     }
1072     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1073                                         $_->{node}->{hold_count} < 4 } @slot);
1074     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1075         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1076     {
1077       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1078           .($thisround_failed+$thisround_succeeded)
1079           .") -- giving up on this round";
1080       Log (undef, $message);
1081       last THISROUND;
1082     }
1083
1084     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1085     for (my $i=$#freeslot; $i>=0; $i--) {
1086       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1087         push @holdslot, (splice @freeslot, $i, 1);
1088       }
1089     }
1090     for (my $i=$#holdslot; $i>=0; $i--) {
1091       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1092         push @freeslot, (splice @holdslot, $i, 1);
1093       }
1094     }
1095
1096     # give up if no nodes are succeeding
1097     if ($working_slot_count < 1) {
1098       Log(undef, "Every node has failed -- giving up");
1099       last THISROUND;
1100     }
1101   }
1102 }
1103
1104
1105 push @freeslot, splice @holdslot;
1106 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1107
1108
1109 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1110 while (%proc)
1111 {
1112   if ($main::please_continue) {
1113     $main::please_continue = 0;
1114     goto THISROUND;
1115   }
1116   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1117   readfrompipes ();
1118   if (!reapchildren())
1119   {
1120     check_refresh_wanted();
1121     check_squeue();
1122     update_progress_stats();
1123     select (undef, undef, undef, 0.1);
1124     killem (keys %proc) if $main::please_freeze;
1125   }
1126 }
1127
1128 update_progress_stats();
1129 freeze_if_want_freeze();
1130
1131
1132 if (!defined $main::success)
1133 {
1134   if (!@jobstep_todo) {
1135     $main::success = 1;
1136   } elsif ($working_slot_count < 1) {
1137     save_output_collection();
1138     save_meta();
1139     exit(EX_RETRY_UNLOCKED);
1140   } elsif ($thisround_succeeded == 0 &&
1141            ($thisround_failed == 0 || $thisround_failed > 4)) {
1142     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1143     Log (undef, $message);
1144     $main::success = 0;
1145   }
1146 }
1147
1148 goto ONELEVEL if !defined $main::success;
1149
1150
1151 release_allocation();
1152 freeze();
1153 my $collated_output = save_output_collection();
1154 Log (undef, "finish");
1155
1156 save_meta();
1157
1158 my $final_state;
1159 if ($collated_output && $main::success) {
1160   $final_state = 'Complete';
1161 } else {
1162   $final_state = 'Failed';
1163 }
1164 $Job->update_attributes('state' => $final_state);
1165
1166 exit (($final_state eq 'Complete') ? 0 : 1);
1167
1168
1169
1170 sub update_progress_stats
1171 {
1172   $progress_stats_updated = time;
1173   return if !$progress_is_dirty;
1174   my ($todo, $done, $running) = (scalar @jobstep_todo,
1175                                  scalar @jobstep_done,
1176                                  scalar keys(%proc));
1177   $Job->{'tasks_summary'} ||= {};
1178   $Job->{'tasks_summary'}->{'todo'} = $todo;
1179   $Job->{'tasks_summary'}->{'done'} = $done;
1180   $Job->{'tasks_summary'}->{'running'} = $running;
1181   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1182   Log (undef, "status: $done done, $running running, $todo todo");
1183   $progress_is_dirty = 0;
1184 }
1185
1186
1187
1188 sub reapchildren
1189 {
1190   my $pid = waitpid (-1, WNOHANG);
1191   return 0 if $pid <= 0;
1192
1193   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1194                   . "."
1195                   . $slot[$proc{$pid}->{slot}]->{cpu});
1196   my $jobstepid = $proc{$pid}->{jobstep};
1197   my $elapsed = time - $proc{$pid}->{time};
1198   my $Jobstep = $jobstep[$jobstepid];
1199
1200   my $childstatus = $?;
1201   my $exitvalue = $childstatus >> 8;
1202   my $exitinfo = "exit ".exit_status_s($childstatus);
1203   $Jobstep->{'arvados_task'}->reload;
1204   my $task_success = $Jobstep->{'arvados_task'}->{success};
1205
1206   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1207
1208   if (!defined $task_success) {
1209     # task did not indicate one way or the other --> fail
1210     Log($jobstepid, sprintf(
1211           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1212           exit_status_s($childstatus)));
1213     $Jobstep->{'arvados_task'}->{success} = 0;
1214     $Jobstep->{'arvados_task'}->save;
1215     $task_success = 0;
1216   }
1217
1218   if (!$task_success)
1219   {
1220     my $temporary_fail;
1221     $temporary_fail ||= $Jobstep->{tempfail};
1222     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1223
1224     ++$thisround_failed;
1225     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1226
1227     # Check for signs of a failed or misconfigured node
1228     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1229         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1230       # Don't count this against jobstep failure thresholds if this
1231       # node is already suspected faulty and srun exited quickly
1232       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1233           $elapsed < 5) {
1234         Log ($jobstepid, "blaming failure on suspect node " .
1235              $slot[$proc{$pid}->{slot}]->{node}->{name});
1236         $temporary_fail ||= 1;
1237       }
1238       ban_node_by_slot($proc{$pid}->{slot});
1239     }
1240
1241     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1242                              ++$Jobstep->{'failures'},
1243                              $temporary_fail ? 'temporary' : 'permanent',
1244                              $elapsed));
1245
1246     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1247       # Give up on this task, and the whole job
1248       $main::success = 0;
1249     }
1250     # Put this task back on the todo queue
1251     push @jobstep_todo, $jobstepid;
1252     $Job->{'tasks_summary'}->{'failed'}++;
1253   }
1254   else
1255   {
1256     ++$thisround_succeeded;
1257     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1258     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1259     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1260     push @jobstep_done, $jobstepid;
1261     Log ($jobstepid, "success in $elapsed seconds");
1262   }
1263   $Jobstep->{exitcode} = $childstatus;
1264   $Jobstep->{finishtime} = time;
1265   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1266   $Jobstep->{'arvados_task'}->save;
1267   process_stderr ($jobstepid, $task_success);
1268   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1269                            length($Jobstep->{'arvados_task'}->{output}),
1270                            $Jobstep->{'arvados_task'}->{output}));
1271
1272   close $reader{$jobstepid};
1273   delete $reader{$jobstepid};
1274   delete $slot[$proc{$pid}->{slot}]->{pid};
1275   push @freeslot, $proc{$pid}->{slot};
1276   delete $proc{$pid};
1277
1278   if ($task_success) {
1279     # Load new tasks
1280     my $newtask_list = [];
1281     my $newtask_results;
1282     do {
1283       $newtask_results = api_call(
1284         "job_tasks/list",
1285         'where' => {
1286           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1287         },
1288         'order' => 'qsequence',
1289         'offset' => scalar(@$newtask_list),
1290       );
1291       push(@$newtask_list, @{$newtask_results->{items}});
1292     } while (@{$newtask_results->{items}});
1293     foreach my $arvados_task (@$newtask_list) {
1294       my $jobstep = {
1295         'level' => $arvados_task->{'sequence'},
1296         'failures' => 0,
1297         'arvados_task' => $arvados_task
1298       };
1299       push @jobstep, $jobstep;
1300       push @jobstep_todo, $#jobstep;
1301     }
1302   }
1303
1304   $progress_is_dirty = 1;
1305   1;
1306 }
1307
1308 sub check_refresh_wanted
1309 {
1310   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1311   if (@stat && $stat[9] > $latest_refresh) {
1312     $latest_refresh = scalar time;
1313     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1314     for my $attr ('cancelled_at',
1315                   'cancelled_by_user_uuid',
1316                   'cancelled_by_client_uuid',
1317                   'state') {
1318       $Job->{$attr} = $Job2->{$attr};
1319     }
1320     if ($Job->{'state'} ne "Running") {
1321       if ($Job->{'state'} eq "Cancelled") {
1322         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1323       } else {
1324         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1325       }
1326       $main::success = 0;
1327       $main::please_freeze = 1;
1328     }
1329   }
1330 }
1331
1332 sub check_squeue
1333 {
1334   my $last_squeue_check = $squeue_checked;
1335
1336   # Do not call `squeue` or check the kill list more than once every
1337   # 15 seconds.
1338   return if $last_squeue_check > time - 15;
1339   $squeue_checked = time;
1340
1341   # Look for children from which we haven't received stderr data since
1342   # the last squeue check. If no such children exist, all procs are
1343   # alive and there's no need to even look at squeue.
1344   #
1345   # As long as the crunchstat poll interval (10s) is shorter than the
1346   # squeue check interval (15s) this should make the squeue check an
1347   # infrequent event.
1348   my $silent_procs = 0;
1349   for my $procinfo (values %proc)
1350   {
1351     my $jobstep = $jobstep[$procinfo->{jobstep}];
1352     if ($jobstep->{stderr_at} < $last_squeue_check)
1353     {
1354       $silent_procs++;
1355     }
1356   }
1357   return if $silent_procs == 0;
1358
1359   # use killem() on procs whose killtime is reached
1360   while (my ($pid, $procinfo) = each %proc)
1361   {
1362     my $jobstep = $jobstep[$procinfo->{jobstep}];
1363     if (exists $procinfo->{killtime}
1364         && $procinfo->{killtime} <= time
1365         && $jobstep->{stderr_at} < $last_squeue_check)
1366     {
1367       my $sincewhen = "";
1368       if ($jobstep->{stderr_at}) {
1369         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1370       }
1371       Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1372       killem ($pid);
1373     }
1374   }
1375
1376   if (!$have_slurm)
1377   {
1378     # here is an opportunity to check for mysterious problems with local procs
1379     return;
1380   }
1381
1382   # Get a list of steps still running.  Note: squeue(1) says --steps
1383   # selects a format (which we override anyway) and allows us to
1384   # specify which steps we're interested in (which we don't).
1385   # Importantly, it also changes the meaning of %j from "job name" to
1386   # "step name" and (although this isn't mentioned explicitly in the
1387   # docs) switches from "one line per job" mode to "one line per step"
1388   # mode. Without it, we'd just get a list of one job, instead of a
1389   # list of N steps.
1390   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1391   if ($? != 0)
1392   {
1393     Log(undef, "warning: squeue exit status $? ($!)");
1394     return;
1395   }
1396   chop @squeue;
1397
1398   # which of my jobsteps are running, according to squeue?
1399   my %ok;
1400   for my $jobstepname (@squeue)
1401   {
1402     $ok{$jobstepname} = 1;
1403   }
1404
1405   # Check for child procs >60s old and not mentioned by squeue.
1406   while (my ($pid, $procinfo) = each %proc)
1407   {
1408     if ($procinfo->{time} < time - 60
1409         && $procinfo->{jobstepname}
1410         && !exists $ok{$procinfo->{jobstepname}}
1411         && !exists $procinfo->{killtime})
1412     {
1413       # According to slurm, this task has ended (successfully or not)
1414       # -- but our srun child hasn't exited. First we must wait (30
1415       # seconds) in case this is just a race between communication
1416       # channels. Then, if our srun child process still hasn't
1417       # terminated, we'll conclude some slurm communication
1418       # error/delay has caused the task to die without notifying srun,
1419       # and we'll kill srun ourselves.
1420       $procinfo->{killtime} = time + 30;
1421       Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1422     }
1423   }
1424 }
1425
1426
1427 sub release_allocation
1428 {
1429   if ($have_slurm)
1430   {
1431     Log (undef, "release job allocation");
1432     system "scancel $ENV{SLURM_JOB_ID}";
1433   }
1434 }
1435
1436
1437 sub readfrompipes
1438 {
1439   my $gotsome = 0;
1440   foreach my $job (keys %reader)
1441   {
1442     my $buf;
1443     if (0 < sysread ($reader{$job}, $buf, 65536))
1444     {
1445       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1446       $jobstep[$job]->{stderr_at} = time;
1447       $jobstep[$job]->{stderr} .= $buf;
1448
1449       # Consume everything up to the last \n
1450       preprocess_stderr ($job);
1451
1452       if (length ($jobstep[$job]->{stderr}) > 16384)
1453       {
1454         # If we get a lot of stderr without a newline, chop off the
1455         # front to avoid letting our buffer grow indefinitely.
1456         substr ($jobstep[$job]->{stderr},
1457                 0, length($jobstep[$job]->{stderr}) - 8192) = "";
1458       }
1459       $gotsome = 1;
1460     }
1461   }
1462   return $gotsome;
1463 }
1464
1465
1466 sub preprocess_stderr
1467 {
1468   my $job = shift;
1469
1470   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1471     my $line = $1;
1472     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1473     Log ($job, "stderr $line");
1474     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1475       # whoa.
1476       $main::please_freeze = 1;
1477     }
1478     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1479       my $job_slot_index = $jobstep[$job]->{slotindex};
1480       $slot[$job_slot_index]->{node}->{fail_count}++;
1481       $jobstep[$job]->{tempfail} = 1;
1482       ban_node_by_slot($job_slot_index);
1483     }
1484     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1485       $jobstep[$job]->{tempfail} = 1;
1486       ban_node_by_slot($jobstep[$job]->{slotindex});
1487     }
1488     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1489       $jobstep[$job]->{tempfail} = 1;
1490     }
1491   }
1492 }
1493
1494
1495 sub process_stderr
1496 {
1497   my $job = shift;
1498   my $task_success = shift;
1499   preprocess_stderr ($job);
1500
1501   map {
1502     Log ($job, "stderr $_");
1503   } split ("\n", $jobstep[$job]->{stderr});
1504 }
1505
1506 sub fetch_block
1507 {
1508   my $hash = shift;
1509   my $keep;
1510   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1511     Log(undef, "fetch_block run error from arv-get $hash: $!");
1512     return undef;
1513   }
1514   my $output_block = "";
1515   while (1) {
1516     my $buf;
1517     my $bytes = sysread($keep, $buf, 1024 * 1024);
1518     if (!defined $bytes) {
1519       Log(undef, "fetch_block read error from arv-get: $!");
1520       $output_block = undef;
1521       last;
1522     } elsif ($bytes == 0) {
1523       # sysread returns 0 at the end of the pipe.
1524       last;
1525     } else {
1526       # some bytes were read into buf.
1527       $output_block .= $buf;
1528     }
1529   }
1530   close $keep;
1531   if ($?) {
1532     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1533     $output_block = undef;
1534   }
1535   return $output_block;
1536 }
1537
1538 # Create a collection by concatenating the output of all tasks (each
1539 # task's output is either a manifest fragment, a locator for a
1540 # manifest fragment stored in Keep, or nothing at all). Return the
1541 # portable_data_hash of the new collection.
1542 sub create_output_collection
1543 {
1544   Log (undef, "collate");
1545
1546   my ($child_out, $child_in);
1547   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1548 import arvados
1549 import sys
1550 print (arvados.api("v1").collections().
1551        create(body={"manifest_text": sys.stdin.read()}).
1552        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1553 }, retry_count());
1554
1555   my $task_idx = -1;
1556   my $manifest_size = 0;
1557   for (@jobstep)
1558   {
1559     ++$task_idx;
1560     my $output = $_->{'arvados_task'}->{output};
1561     next if (!defined($output));
1562     my $next_write;
1563     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1564       $next_write = fetch_block($output);
1565     } else {
1566       $next_write = $output;
1567     }
1568     if (defined($next_write)) {
1569       if (!defined(syswrite($child_in, $next_write))) {
1570         # There's been an error writing.  Stop the loop.
1571         # We'll log details about the exit code later.
1572         last;
1573       } else {
1574         $manifest_size += length($next_write);
1575       }
1576     } else {
1577       my $uuid = $_->{'arvados_task'}->{'uuid'};
1578       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1579       $main::success = 0;
1580     }
1581   }
1582   close($child_in);
1583   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1584
1585   my $joboutput;
1586   my $s = IO::Select->new($child_out);
1587   if ($s->can_read(120)) {
1588     sysread($child_out, $joboutput, 1024 * 1024);
1589     waitpid($pid, 0);
1590     if ($?) {
1591       Log(undef, "output collection creation exited " . exit_status_s($?));
1592       $joboutput = undef;
1593     } else {
1594       chomp($joboutput);
1595     }
1596   } else {
1597     Log (undef, "timed out while creating output collection");
1598     foreach my $signal (2, 2, 2, 15, 15, 9) {
1599       kill($signal, $pid);
1600       last if waitpid($pid, WNOHANG) == -1;
1601       sleep(1);
1602     }
1603   }
1604   close($child_out);
1605
1606   return $joboutput;
1607 }
1608
1609 # Calls create_output_collection, logs the result, and returns it.
1610 # If that was successful, save that as the output in the job record.
1611 sub save_output_collection {
1612   my $collated_output = create_output_collection();
1613
1614   if (!$collated_output) {
1615     Log(undef, "Failed to write output collection");
1616   }
1617   else {
1618     Log(undef, "job output $collated_output");
1619     $Job->update_attributes('output' => $collated_output);
1620   }
1621   return $collated_output;
1622 }
1623
1624 sub killem
1625 {
1626   foreach (@_)
1627   {
1628     my $sig = 2;                # SIGINT first
1629     if (exists $proc{$_}->{"sent_$sig"} &&
1630         time - $proc{$_}->{"sent_$sig"} > 4)
1631     {
1632       $sig = 15;                # SIGTERM if SIGINT doesn't work
1633     }
1634     if (exists $proc{$_}->{"sent_$sig"} &&
1635         time - $proc{$_}->{"sent_$sig"} > 4)
1636     {
1637       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1638     }
1639     if (!exists $proc{$_}->{"sent_$sig"})
1640     {
1641       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1642       kill $sig, $_;
1643       select (undef, undef, undef, 0.1);
1644       if ($sig == 2)
1645       {
1646         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1647       }
1648       $proc{$_}->{"sent_$sig"} = time;
1649       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1650     }
1651   }
1652 }
1653
1654
1655 sub fhbits
1656 {
1657   my($bits);
1658   for (@_) {
1659     vec($bits,fileno($_),1) = 1;
1660   }
1661   $bits;
1662 }
1663
1664
1665 # Send log output to Keep via arv-put.
1666 #
1667 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1668 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1669 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1670 # $log_pipe_pid is the pid of the arv-put subprocess.
1671 #
1672 # The only functions that should access these variables directly are:
1673 #
1674 # log_writer_start($logfilename)
1675 #     Starts an arv-put pipe, reading data on stdin and writing it to
1676 #     a $logfilename file in an output collection.
1677 #
1678 # log_writer_read_output([$timeout])
1679 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1680 #     Passes $timeout to the select() call, with a default of 0.01.
1681 #     Returns the result of the last read() call on $log_pipe_out, or
1682 #     -1 if read() wasn't called because select() timed out.
1683 #     Only other log_writer_* functions should need to call this.
1684 #
1685 # log_writer_send($txt)
1686 #     Writes $txt to the output log collection.
1687 #
1688 # log_writer_finish()
1689 #     Closes the arv-put pipe and returns the output that it produces.
1690 #
1691 # log_writer_is_active()
1692 #     Returns a true value if there is currently a live arv-put
1693 #     process, false otherwise.
1694 #
1695 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1696     $log_pipe_pid);
1697
1698 sub log_writer_start($)
1699 {
1700   my $logfilename = shift;
1701   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1702                         'arv-put',
1703                         '--stream',
1704                         '--retries', '3',
1705                         '--filename', $logfilename,
1706                         '-');
1707   $log_pipe_out_buf = "";
1708   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1709 }
1710
1711 sub log_writer_read_output {
1712   my $timeout = shift || 0.01;
1713   my $read = -1;
1714   while ($read && $log_pipe_out_select->can_read($timeout)) {
1715     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1716                  length($log_pipe_out_buf));
1717   }
1718   if (!defined($read)) {
1719     Log(undef, "error reading log manifest from arv-put: $!");
1720   }
1721   return $read;
1722 }
1723
1724 sub log_writer_send($)
1725 {
1726   my $txt = shift;
1727   print $log_pipe_in $txt;
1728   log_writer_read_output();
1729 }
1730
1731 sub log_writer_finish()
1732 {
1733   return unless $log_pipe_pid;
1734
1735   close($log_pipe_in);
1736
1737   my $logger_failed = 0;
1738   my $read_result = log_writer_read_output(120);
1739   if ($read_result == -1) {
1740     $logger_failed = -1;
1741     Log (undef, "timed out reading from 'arv-put'");
1742   } elsif ($read_result != 0) {
1743     $logger_failed = -2;
1744     Log(undef, "failed to read arv-put log manifest to EOF");
1745   }
1746
1747   waitpid($log_pipe_pid, 0);
1748   if ($?) {
1749     $logger_failed ||= $?;
1750     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1751   }
1752
1753   close($log_pipe_out);
1754   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1755   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1756       $log_pipe_out_select = undef;
1757
1758   return $arv_put_output;
1759 }
1760
1761 sub log_writer_is_active() {
1762   return $log_pipe_pid;
1763 }
1764
1765 sub Log                         # ($jobstep_id, $logmessage)
1766 {
1767   if ($_[1] =~ /\n/) {
1768     for my $line (split (/\n/, $_[1])) {
1769       Log ($_[0], $line);
1770     }
1771     return;
1772   }
1773   my $fh = select STDERR; $|=1; select $fh;
1774   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1775   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1776   $message .= "\n";
1777   my $datetime;
1778   if (log_writer_is_active() || -t STDERR) {
1779     my @gmtime = gmtime;
1780     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1781                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1782   }
1783   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1784
1785   if (log_writer_is_active()) {
1786     log_writer_send($datetime . " " . $message);
1787   }
1788 }
1789
1790
1791 sub croak
1792 {
1793   my ($package, $file, $line) = caller;
1794   my $message = "@_ at $file line $line\n";
1795   Log (undef, $message);
1796   freeze() if @jobstep_todo;
1797   create_output_collection() if @jobstep_todo;
1798   cleanup();
1799   save_meta();
1800   die;
1801 }
1802
1803
1804 sub cleanup
1805 {
1806   return unless $Job;
1807   if ($Job->{'state'} eq 'Cancelled') {
1808     $Job->update_attributes('finished_at' => scalar gmtime);
1809   } else {
1810     $Job->update_attributes('state' => 'Failed');
1811   }
1812 }
1813
1814
1815 sub save_meta
1816 {
1817   my $justcheckpoint = shift; # false if this will be the last meta saved
1818   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1819   return unless log_writer_is_active();
1820   my $log_manifest = log_writer_finish();
1821   return unless defined($log_manifest);
1822
1823   if ($Job->{log}) {
1824     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1825     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1826   }
1827
1828   my $log_coll = api_call(
1829     "collections/create", ensure_unique_name => 1, collection => {
1830       manifest_text => $log_manifest,
1831       owner_uuid => $Job->{owner_uuid},
1832       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1833     });
1834   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1835   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1836 }
1837
1838
1839 sub freeze_if_want_freeze
1840 {
1841   if ($main::please_freeze)
1842   {
1843     release_allocation();
1844     if (@_)
1845     {
1846       # kill some srun procs before freeze+stop
1847       map { $proc{$_} = {} } @_;
1848       while (%proc)
1849       {
1850         killem (keys %proc);
1851         select (undef, undef, undef, 0.1);
1852         my $died;
1853         while (($died = waitpid (-1, WNOHANG)) > 0)
1854         {
1855           delete $proc{$died};
1856         }
1857       }
1858     }
1859     freeze();
1860     create_output_collection();
1861     cleanup();
1862     save_meta();
1863     exit 1;
1864   }
1865 }
1866
1867
1868 sub freeze
1869 {
1870   Log (undef, "Freeze not implemented");
1871   return;
1872 }
1873
1874
1875 sub thaw
1876 {
1877   croak ("Thaw not implemented");
1878 }
1879
1880
1881 sub freezequote
1882 {
1883   my $s = shift;
1884   $s =~ s/\\/\\\\/g;
1885   $s =~ s/\n/\\n/g;
1886   return $s;
1887 }
1888
1889
1890 sub freezeunquote
1891 {
1892   my $s = shift;
1893   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1894   return $s;
1895 }
1896
1897
1898 sub srun
1899 {
1900   my $srunargs = shift;
1901   my $execargs = shift;
1902   my $opts = shift || {};
1903   my $stdin = shift;
1904   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1905
1906   $Data::Dumper::Terse = 1;
1907   $Data::Dumper::Indent = 0;
1908   my $show_cmd = Dumper($args);
1909   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1910   $show_cmd =~ s/\n/ /g;
1911   if ($opts->{fork}) {
1912     Log(undef, "starting: $show_cmd");
1913   } else {
1914     # This is a child process: parent is in charge of reading our
1915     # stderr and copying it to Log() if needed.
1916     warn "starting: $show_cmd\n";
1917   }
1918
1919   if (defined $stdin) {
1920     my $child = open STDIN, "-|";
1921     defined $child or die "no fork: $!";
1922     if ($child == 0) {
1923       print $stdin or die $!;
1924       close STDOUT or die $!;
1925       exit 0;
1926     }
1927   }
1928
1929   return system (@$args) if $opts->{fork};
1930
1931   exec @$args;
1932   warn "ENV size is ".length(join(" ",%ENV));
1933   die "exec failed: $!: @$args";
1934 }
1935
1936
1937 sub ban_node_by_slot {
1938   # Don't start any new jobsteps on this node for 60 seconds
1939   my $slotid = shift;
1940   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1941   $slot[$slotid]->{node}->{hold_count}++;
1942   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1943 }
1944
1945 sub must_lock_now
1946 {
1947   my ($lockfile, $error_message) = @_;
1948   open L, ">", $lockfile or croak("$lockfile: $!");
1949   if (!flock L, LOCK_EX|LOCK_NB) {
1950     croak("Can't lock $lockfile: $error_message\n");
1951   }
1952 }
1953
1954 sub find_docker_image {
1955   # Given a Keep locator, check to see if it contains a Docker image.
1956   # If so, return its stream name and Docker hash.
1957   # If not, return undef for both values.
1958   my $locator = shift;
1959   my ($streamname, $filename);
1960   my $image = api_call("collections/get", uuid => $locator);
1961   if ($image) {
1962     foreach my $line (split(/\n/, $image->{manifest_text})) {
1963       my @tokens = split(/\s+/, $line);
1964       next if (!@tokens);
1965       $streamname = shift(@tokens);
1966       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1967         if (defined($filename)) {
1968           return (undef, undef);  # More than one file in the Collection.
1969         } else {
1970           $filename = (split(/:/, $filedata, 3))[2];
1971         }
1972       }
1973     }
1974   }
1975   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1976     return ($streamname, $1);
1977   } else {
1978     return (undef, undef);
1979   }
1980 }
1981
1982 sub retry_count {
1983   # Calculate the number of times an operation should be retried,
1984   # assuming exponential backoff, and that we're willing to retry as
1985   # long as tasks have been running.  Enforce a minimum of 3 retries.
1986   my ($starttime, $endtime, $timediff, $retries);
1987   if (@jobstep) {
1988     $starttime = $jobstep[0]->{starttime};
1989     $endtime = $jobstep[-1]->{finishtime};
1990   }
1991   if (!defined($starttime)) {
1992     $timediff = 0;
1993   } elsif (!defined($endtime)) {
1994     $timediff = time - $starttime;
1995   } else {
1996     $timediff = ($endtime - $starttime) - (time - $endtime);
1997   }
1998   if ($timediff > 0) {
1999     $retries = int(log($timediff) / log(2));
2000   } else {
2001     $retries = 1;  # Use the minimum.
2002   }
2003   return ($retries > 3) ? $retries : 3;
2004 }
2005
2006 sub retry_op {
2007   # Pass in two function references.
2008   # This method will be called with the remaining arguments.
2009   # If it dies, retry it with exponential backoff until it succeeds,
2010   # or until the current retry_count is exhausted.  After each failure
2011   # that can be retried, the second function will be called with
2012   # the current try count (0-based), next try time, and error message.
2013   my $operation = shift;
2014   my $retry_callback = shift;
2015   my $retries = retry_count();
2016   foreach my $try_count (0..$retries) {
2017     my $next_try = time + (2 ** $try_count);
2018     my $result = eval { $operation->(@_); };
2019     if (!$@) {
2020       return $result;
2021     } elsif ($try_count < $retries) {
2022       $retry_callback->($try_count, $next_try, $@);
2023       my $sleep_time = $next_try - time;
2024       sleep($sleep_time) if ($sleep_time > 0);
2025     }
2026   }
2027   # Ensure the error message ends in a newline, so Perl doesn't add
2028   # retry_op's line number to it.
2029   chomp($@);
2030   die($@ . "\n");
2031 }
2032
2033 sub api_call {
2034   # Pass in a /-separated API method name, and arguments for it.
2035   # This function will call that method, retrying as needed until
2036   # the current retry_count is exhausted, with a log on the first failure.
2037   my $method_name = shift;
2038   my $log_api_retry = sub {
2039     my ($try_count, $next_try_at, $errmsg) = @_;
2040     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2041     $errmsg =~ s/\s/ /g;
2042     $errmsg =~ s/\s+$//;
2043     my $retry_msg;
2044     if ($next_try_at < time) {
2045       $retry_msg = "Retrying.";
2046     } else {
2047       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2048       $retry_msg = "Retrying at $next_try_fmt.";
2049     }
2050     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2051   };
2052   my $method = $arv;
2053   foreach my $key (split(/\//, $method_name)) {
2054     $method = $method->{$key};
2055   }
2056   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2057 }
2058
2059 sub exit_status_s {
2060   # Given a $?, return a human-readable exit code string like "0" or
2061   # "1" or "0 with signal 1" or "1 with signal 11".
2062   my $exitcode = shift;
2063   my $s = $exitcode >> 8;
2064   if ($exitcode & 0x7f) {
2065     $s .= " with signal " . ($exitcode & 0x7f);
2066   }
2067   if ($exitcode & 0x80) {
2068     $s .= " with core dump";
2069   }
2070   return $s;
2071 }
2072
2073 sub handle_readall {
2074   # Pass in a glob reference to a file handle.
2075   # Read all its contents and return them as a string.
2076   my $fh_glob_ref = shift;
2077   local $/ = undef;
2078   return <$fh_glob_ref>;
2079 }
2080
2081 sub tar_filename_n {
2082   my $n = shift;
2083   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2084 }
2085
2086 sub add_git_archive {
2087   # Pass in a git archive command as a string or list, a la system().
2088   # This method will save its output to be included in the archive sent to the
2089   # build script.
2090   my $git_input;
2091   $git_tar_count++;
2092   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2093     croak("Failed to save git archive: $!");
2094   }
2095   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2096   close($git_input);
2097   waitpid($git_pid, 0);
2098   close(GIT_ARCHIVE);
2099   if ($?) {
2100     croak("Failed to save git archive: git exited " . exit_status_s($?));
2101   }
2102 }
2103
2104 sub combined_git_archive {
2105   # Combine all saved tar archives into a single archive, then return its
2106   # contents in a string.  Return undef if no archives have been saved.
2107   if ($git_tar_count < 1) {
2108     return undef;
2109   }
2110   my $base_tar_name = tar_filename_n(1);
2111   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2112     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2113     if ($tar_exit != 0) {
2114       croak("Error preparing build archive: tar -A exited " .
2115             exit_status_s($tar_exit));
2116     }
2117   }
2118   if (!open(GIT_TAR, "<", $base_tar_name)) {
2119     croak("Could not open build archive: $!");
2120   }
2121   my $tar_contents = handle_readall(\*GIT_TAR);
2122   close(GIT_TAR);
2123   return $tar_contents;
2124 }
2125
2126 sub set_nonblocking {
2127   my $fh = shift;
2128   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2129   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2130 }
2131
2132 __DATA__
2133 #!/usr/bin/env perl
2134 #
2135 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2136 # server invokes this script on individual compute nodes, or localhost if we're
2137 # running a job locally.  It gets called in two modes:
2138 #
2139 # * No arguments: Installation mode.  Read a tar archive from the DATA
2140 #   file handle; it includes the Crunch script's source code, and
2141 #   maybe SDKs as well.  Those should be installed in the proper
2142 #   locations.  This runs outside of any Docker container, so don't try to
2143 #   introspect Crunch's runtime environment.
2144 #
2145 # * With arguments: Crunch script run mode.  This script should set up the
2146 #   environment, then run the command specified in the arguments.  This runs
2147 #   inside any Docker container.
2148
2149 use Fcntl ':flock';
2150 use File::Path qw( make_path remove_tree );
2151 use POSIX qw(getcwd);
2152
2153 use constant TASK_TEMPFAIL => 111;
2154
2155 # Map SDK subdirectories to the path environments they belong to.
2156 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2157
2158 my $destdir = $ENV{"CRUNCH_SRC"};
2159 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2160 my $repo = $ENV{"CRUNCH_SRC_URL"};
2161 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2162 my $job_work = $ENV{"JOB_WORK"};
2163 my $task_work = $ENV{"TASK_WORK"};
2164
2165 open(STDOUT_ORIG, ">&", STDOUT);
2166 open(STDERR_ORIG, ">&", STDERR);
2167
2168 for my $dir ($destdir, $job_work, $task_work) {
2169   if ($dir) {
2170     make_path $dir;
2171     -e $dir or die "Failed to create temporary directory ($dir): $!";
2172   }
2173 }
2174
2175 if ($task_work) {
2176   remove_tree($task_work, {keep_root => 1});
2177 }
2178
2179 ### Crunch script run mode
2180 if (@ARGV) {
2181   # We want to do routine logging during task 0 only.  This gives the user
2182   # the information they need, but avoids repeating the information for every
2183   # task.
2184   my $Log;
2185   if ($ENV{TASK_SEQUENCE} eq "0") {
2186     $Log = sub {
2187       my $msg = shift;
2188       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2189     };
2190   } else {
2191     $Log = sub { };
2192   }
2193
2194   my $python_src = "$install_dir/python";
2195   my $venv_dir = "$job_work/.arvados.venv";
2196   my $venv_built = -e "$venv_dir/bin/activate";
2197   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2198     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2199                  "--python=python2.7", $venv_dir);
2200     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2201     $venv_built = 1;
2202     $Log->("Built Python SDK virtualenv");
2203   }
2204
2205   my @pysdk_version_cmd = ("python", "-c",
2206     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2207   if ($venv_built) {
2208     $Log->("Running in Python SDK virtualenv");
2209     @pysdk_version_cmd = ();
2210     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2211     @ARGV = ("/bin/sh", "-ec",
2212              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2213   } elsif (-d $python_src) {
2214     $Log->("Warning: virtualenv not found inside Docker container default " .
2215            "\$PATH. Can't install Python SDK.");
2216   }
2217
2218   if (@pysdk_version_cmd) {
2219     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2220     my $pysdk_version = <$pysdk_version_pipe>;
2221     close($pysdk_version_pipe);
2222     if ($? == 0) {
2223       chomp($pysdk_version);
2224       $Log->("Using Arvados SDK version $pysdk_version");
2225     } else {
2226       # A lot could've gone wrong here, but pretty much all of it means that
2227       # Python won't be able to load the Arvados SDK.
2228       $Log->("Warning: Arvados SDK not found");
2229     }
2230   }
2231
2232   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2233     my $sdk_path = "$install_dir/$sdk_dir";
2234     if (-d $sdk_path) {
2235       if ($ENV{$sdk_envkey}) {
2236         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2237       } else {
2238         $ENV{$sdk_envkey} = $sdk_path;
2239       }
2240       $Log->("Arvados SDK added to %s", $sdk_envkey);
2241     }
2242   }
2243
2244   exec(@ARGV);
2245   die "Cannot exec `@ARGV`: $!";
2246 }
2247
2248 ### Installation mode
2249 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2250 flock L, LOCK_EX;
2251 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2252   # This exact git archive (source + arvados sdk) is already installed
2253   # here, so there's no need to reinstall it.
2254
2255   # We must consume our DATA section, though: otherwise the process
2256   # feeding it to us will get SIGPIPE.
2257   my $buf;
2258   while (read(DATA, $buf, 65536)) { }
2259
2260   exit(0);
2261 }
2262
2263 unlink "$destdir.archive_hash";
2264 mkdir $destdir;
2265
2266 do {
2267   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2268   local $SIG{PIPE} = "IGNORE";
2269   warn "Extracting archive: $archive_hash\n";
2270   # --ignore-zeros is necessary sometimes: depending on how much NUL
2271   # padding tar -A put on our combined archive (which in turn depends
2272   # on the length of the component archives) tar without
2273   # --ignore-zeros will exit before consuming stdin and cause close()
2274   # to fail on the resulting SIGPIPE.
2275   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2276     die "Error launching 'tar -xC $destdir': $!";
2277   }
2278   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2279   # get SIGPIPE.  We must feed it data incrementally.
2280   my $tar_input;
2281   while (read(DATA, $tar_input, 65536)) {
2282     print TARX $tar_input;
2283   }
2284   if(!close(TARX)) {
2285     die "'tar -xC $destdir' exited $?: $!";
2286   }
2287 };
2288
2289 mkdir $install_dir;
2290
2291 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2292 if (-d $sdk_root) {
2293   foreach my $sdk_lang (("python",
2294                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2295     if (-d "$sdk_root/$sdk_lang") {
2296       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2297         die "Failed to install $sdk_lang SDK: $!";
2298       }
2299     }
2300   }
2301 }
2302
2303 my $python_dir = "$install_dir/python";
2304 if ((-d $python_dir) and can_run("python2.7")) {
2305   open(my $egg_info_pipe, "-|",
2306        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2307   my @egg_info_errors = <$egg_info_pipe>;
2308   close($egg_info_pipe);
2309
2310   if ($?) {
2311     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2312       # egg_info apparently failed because it couldn't ask git for a build tag.
2313       # Specify no build tag.
2314       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2315       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2316       close($pysdk_cfg);
2317     } else {
2318       my $egg_info_exit = $? >> 8;
2319       foreach my $errline (@egg_info_errors) {
2320         warn $errline;
2321       }
2322       warn "python setup.py egg_info failed: exit $egg_info_exit";
2323       exit ($egg_info_exit || 1);
2324     }
2325   }
2326 }
2327
2328 # Hide messages from the install script (unless it fails: shell_or_die
2329 # will show $destdir.log in that case).
2330 open(STDOUT, ">>", "$destdir.log");
2331 open(STDERR, ">&", STDOUT);
2332
2333 if (-e "$destdir/crunch_scripts/install") {
2334     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2335 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2336     # Old version
2337     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2338 } elsif (-e "./install.sh") {
2339     shell_or_die (undef, "./install.sh", $install_dir);
2340 }
2341
2342 if ($archive_hash) {
2343     unlink "$destdir.archive_hash.new";
2344     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2345     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2346 }
2347
2348 close L;
2349
2350 sub can_run {
2351   my $command_name = shift;
2352   open(my $which, "-|", "which", $command_name);
2353   while (<$which>) { }
2354   close($which);
2355   return ($? == 0);
2356 }
2357
2358 sub shell_or_die
2359 {
2360   my $exitcode = shift;
2361
2362   if ($ENV{"DEBUG"}) {
2363     print STDERR "@_\n";
2364   }
2365   if (system (@_) != 0) {
2366     my $err = $!;
2367     my $code = $?;
2368     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2369     open STDERR, ">&STDERR_ORIG";
2370     system ("cat $destdir.log >&2");
2371     warn "@_ failed ($err): $exitstatus";
2372     if (defined($exitcode)) {
2373       exit $exitcode;
2374     }
2375     else {
2376       exit (($code >> 8) || 1);
2377     }
2378   }
2379 }
2380
2381 __DATA__