11138: Test for docker image after loading, in case docker-load erroneously proclaime...
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $cgroup_root = "/sys/fs/cgroup";
130 my $docker_bin = "docker.io";
131 my $docker_run_args = "";
132 GetOptions('force-unlock' => \$force_unlock,
133            'git-dir=s' => \$git_dir,
134            'job=s' => \$jobspec,
135            'job-api-token=s' => \$job_api_token,
136            'no-clear-tmp' => \$no_clear_tmp,
137            'resume-stash=s' => \$resume_stash,
138            'cgroup-root=s' => \$cgroup_root,
139            'docker-bin=s' => \$docker_bin,
140            'docker-run-args=s' => \$docker_run_args,
141     );
142
143 if (defined $job_api_token) {
144   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
145 }
146
147 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
148
149
150 $SIG{'USR1'} = sub
151 {
152   $main::ENV{CRUNCH_DEBUG} = 1;
153 };
154 $SIG{'USR2'} = sub
155 {
156   $main::ENV{CRUNCH_DEBUG} = 0;
157 };
158
159 my $arv = Arvados->new('apiVersion' => 'v1');
160
161 my $Job;
162 my $job_id;
163 my $dbh;
164 my $sth;
165 my @jobstep;
166
167 my $local_job;
168 if ($jobspec =~ /^[-a-z\d]+$/)
169 {
170   # $jobspec is an Arvados UUID, not a JSON job specification
171   $Job = api_call("jobs/get", uuid => $jobspec);
172   $local_job = 0;
173 }
174 else
175 {
176   $local_job = JSON::decode_json($jobspec);
177 }
178
179
180 # Make sure our workers (our slurm nodes, localhost, or whatever) are
181 # at least able to run basic commands: they aren't down or severely
182 # misconfigured.
183 my $cmd = ['true'];
184 if (($Job || $local_job)->{docker_image_locator}) {
185   $cmd = [$docker_bin, 'ps', '-q'];
186 }
187 Log(undef, "Sanity check is `@$cmd`");
188 my ($exited, $stdout, $stderr) = srun_sync(
189   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
190   $cmd,
191   {label => "sanity check"});
192 if ($exited != 0) {
193   Log(undef, "Sanity check failed: ".exit_status_s($exited));
194   exit EX_TEMPFAIL;
195 }
196 Log(undef, "Sanity check OK");
197
198
199 my $User = api_call("users/current");
200
201 if (!$local_job) {
202   if (!$force_unlock) {
203     # Claim this job, and make sure nobody else does
204     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
205     if ($@) {
206       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
207       exit EX_TEMPFAIL;
208     };
209   }
210 }
211 else
212 {
213   if (!$resume_stash)
214   {
215     map { croak ("No $_ specified") unless $local_job->{$_} }
216     qw(script script_version script_parameters);
217   }
218
219   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
220   $local_job->{'started_at'} = gmtime;
221   $local_job->{'state'} = 'Running';
222
223   $Job = api_call("jobs/create", job => $local_job);
224 }
225 $job_id = $Job->{'uuid'};
226
227 my $keep_logfile = $job_id . '.log.txt';
228 log_writer_start($keep_logfile);
229
230 $Job->{'runtime_constraints'} ||= {};
231 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
232 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
233
234 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
235 if ($? == 0) {
236   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
237   chomp($gem_versions);
238   chop($gem_versions);  # Closing parentheses
239 } else {
240   $gem_versions = "";
241 }
242 Log(undef,
243     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
244
245 Log (undef, "check slurm allocation");
246 my @slot;
247 my @node;
248 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
249 my @sinfo;
250 if (!$have_slurm)
251 {
252   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
253   push @sinfo, "$localcpus localhost";
254 }
255 if (exists $ENV{SLURM_NODELIST})
256 {
257   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
258 }
259 foreach (@sinfo)
260 {
261   my ($ncpus, $slurm_nodelist) = split;
262   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
263
264   my @nodelist;
265   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
266   {
267     my $nodelist = $1;
268     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
269     {
270       my $ranges = $1;
271       foreach (split (",", $ranges))
272       {
273         my ($a, $b);
274         if (/(\d+)-(\d+)/)
275         {
276           $a = $1;
277           $b = $2;
278         }
279         else
280         {
281           $a = $_;
282           $b = $_;
283         }
284         push @nodelist, map {
285           my $n = $nodelist;
286           $n =~ s/\[[-,\d]+\]/$_/;
287           $n;
288         } ($a..$b);
289       }
290     }
291     else
292     {
293       push @nodelist, $nodelist;
294     }
295   }
296   foreach my $nodename (@nodelist)
297   {
298     Log (undef, "node $nodename - $ncpus slots");
299     my $node = { name => $nodename,
300                  ncpus => $ncpus,
301                  # The number of consecutive times a task has been dispatched
302                  # to this node and failed.
303                  losing_streak => 0,
304                  # The number of consecutive times that SLURM has reported
305                  # a node failure since the last successful task.
306                  fail_count => 0,
307                  # Don't dispatch work to this node until this time
308                  # (in seconds since the epoch) has passed.
309                  hold_until => 0 };
310     foreach my $cpu (1..$ncpus)
311     {
312       push @slot, { node => $node,
313                     cpu => $cpu };
314     }
315   }
316   push @node, @nodelist;
317 }
318
319
320
321 # Ensure that we get one jobstep running on each allocated node before
322 # we start overloading nodes with concurrent steps
323
324 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
325
326
327 $Job->update_attributes(
328   'tasks_summary' => { 'failed' => 0,
329                        'todo' => 1,
330                        'running' => 0,
331                        'done' => 0 });
332
333 Log (undef, "start");
334 $SIG{'INT'} = sub { $main::please_freeze = 1; };
335 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
336 $SIG{'TERM'} = \&croak;
337 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
338 $SIG{'ALRM'} = sub { $main::please_info = 1; };
339 $SIG{'CONT'} = sub { $main::please_continue = 1; };
340 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
341
342 $main::please_freeze = 0;
343 $main::please_info = 0;
344 $main::please_continue = 0;
345 $main::please_refresh = 0;
346 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
347
348 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
349 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
350 $ENV{"JOB_UUID"} = $job_id;
351
352
353 my @jobstep_todo = ();
354 my @jobstep_done = ();
355 my @jobstep_tomerge = ();
356 my $jobstep_tomerge_level = 0;
357 my $squeue_checked = 0;
358 my $sinfo_checked = 0;
359 my $latest_refresh = scalar time;
360
361
362
363 if (defined $Job->{thawedfromkey})
364 {
365   thaw ($Job->{thawedfromkey});
366 }
367 else
368 {
369   my $first_task = api_call("job_tasks/create", job_task => {
370     'job_uuid' => $Job->{'uuid'},
371     'sequence' => 0,
372     'qsequence' => 0,
373     'parameters' => {},
374   });
375   push @jobstep, { 'level' => 0,
376                    'failures' => 0,
377                    'arvados_task' => $first_task,
378                  };
379   push @jobstep_todo, 0;
380 }
381
382
383 if (!$have_slurm)
384 {
385   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
386 }
387
388 my $build_script = handle_readall(\*DATA);
389 my $nodelist = join(",", @node);
390 my $git_tar_count = 0;
391
392 if (!defined $no_clear_tmp) {
393   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
394   # up work directories crunch_tmp/work, crunch_tmp/opt,
395   # crunch_tmp/src*.
396   #
397   # TODO: When #5036 is done and widely deployed, we can limit mount's
398   # -t option to simply fuse.keep.
399   my ($exited, $stdout, $stderr) = srun_sync(
400     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
401     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
402     {label => "clean work dirs"});
403   if ($exited != 0) {
404     exit(EX_RETRY_UNLOCKED);
405   }
406 }
407
408 # If this job requires a Docker image, install that.
409 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
410 if ($docker_locator = $Job->{docker_image_locator}) {
411   Log (undef, "Install docker image $docker_locator");
412   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
413   if (!$docker_hash)
414   {
415     croak("No Docker image hash found from locator $docker_locator");
416   }
417   Log (undef, "docker image hash is $docker_hash");
418   $docker_stream =~ s/^\.//;
419   my $docker_install_script = qq{
420 loaded() {
421   [[ \$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) = \Q$docker_hash\E ]]
422 }
423 if loaded 2>/dev/null; then
424   echo >&2 "image is already present"
425   exit 0
426 fi
427 echo >&2 "docker image is not present; loading"
428 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
429 if ! loaded; then
430   echo >&2 "`docker load` exited 0, but image is not found (!)"
431   exit 1
432 fi
433 echo >&2 "image loaded successfully"
434 };
435
436   my ($exited, $stdout, $stderr) = srun_sync(
437     ["srun", "--nodelist=" . join(',', @node)],
438     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
439     {label => "load docker image"});
440   if ($exited != 0)
441   {
442     exit(EX_RETRY_UNLOCKED);
443   }
444
445   # Determine whether this version of Docker supports memory+swap limits.
446   ($exited, $stdout, $stderr) = srun_sync(
447     ["srun", "--nodes=1"],
448     [$docker_bin, 'run', '--help'],
449     {label => "check --memory-swap feature"});
450   $docker_limitmem = ($stdout =~ /--memory-swap/);
451
452   # Find a non-root Docker user to use.
453   # Tries the default user for the container, then 'crunch', then 'nobody',
454   # testing for whether the actual user id is non-zero.  This defends against
455   # mistakes but not malice, but we intend to harden the security in the future
456   # so we don't want anyone getting used to their jobs running as root in their
457   # Docker containers.
458   my @tryusers = ("", "crunch", "nobody");
459   foreach my $try_user (@tryusers) {
460     my $label;
461     my $try_user_arg;
462     if ($try_user eq "") {
463       $label = "check whether default user is UID 0";
464       $try_user_arg = "";
465     } else {
466       $label = "check whether user '$try_user' is UID 0";
467       $try_user_arg = "--user=$try_user";
468     }
469     my ($exited, $stdout, $stderr) = srun_sync(
470       ["srun", "--nodes=1"],
471       ["/bin/sh", "-ec",
472        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
473       {label => $label});
474     chomp($stdout);
475     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
476       $dockeruserarg = $try_user_arg;
477       if ($try_user eq "") {
478         Log(undef, "Container will run with default user");
479       } else {
480         Log(undef, "Container will run with $dockeruserarg");
481       }
482       last;
483     }
484   }
485
486   if (!defined $dockeruserarg) {
487     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
488   }
489
490   if ($Job->{arvados_sdk_version}) {
491     # The job also specifies an Arvados SDK version.  Add the SDKs to the
492     # tar file for the build script to install.
493     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
494                        $Job->{arvados_sdk_version}));
495     add_git_archive("git", "--git-dir=$git_dir", "archive",
496                     "--prefix=.arvados.sdk/",
497                     $Job->{arvados_sdk_version}, "sdk");
498   }
499 }
500
501 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
502   # If script_version looks like an absolute path, *and* the --git-dir
503   # argument was not given -- which implies we were not invoked by
504   # crunch-dispatch -- we will use the given path as a working
505   # directory instead of resolving script_version to a git commit (or
506   # doing anything else with git).
507   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
508   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
509 }
510 else {
511   # Resolve the given script_version to a git commit sha1. Also, if
512   # the repository is remote, clone it into our local filesystem: this
513   # ensures "git archive" will work, and is necessary to reliably
514   # resolve a symbolic script_version like "master^".
515   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
516
517   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
520
521   # If we're running under crunch-dispatch, it will have already
522   # pulled the appropriate source tree into its own repository, and
523   # given us that repo's path as $git_dir.
524   #
525   # If we're running a "local" job, we might have to fetch content
526   # from a remote repository.
527   #
528   # (Currently crunch-dispatch gives a local path with --git-dir, but
529   # we might as well accept URLs there too in case it changes its
530   # mind.)
531   my $repo = $git_dir || $Job->{'repository'};
532
533   # Repository can be remote or local. If remote, we'll need to fetch it
534   # to a local dir before doing `git log` et al.
535   my $repo_location;
536
537   if ($repo =~ m{://|^[^/]*:}) {
538     # $repo is a git url we can clone, like git:// or https:// or
539     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
540     # not recognized here because distinguishing that from a local
541     # path is too fragile. If you really need something strange here,
542     # use the ssh:// form.
543     $repo_location = 'remote';
544   } elsif ($repo =~ m{^\.*/}) {
545     # $repo is a local path to a git index. We'll also resolve ../foo
546     # to ../foo/.git if the latter is a directory. To help
547     # disambiguate local paths from named hosted repositories, this
548     # form must be given as ./ or ../ if it's a relative path.
549     if (-d "$repo/.git") {
550       $repo = "$repo/.git";
551     }
552     $repo_location = 'local';
553   } else {
554     # $repo is none of the above. It must be the name of a hosted
555     # repository.
556     my $arv_repo_list = api_call("repositories/list",
557                                  'filters' => [['name','=',$repo]]);
558     my @repos_found = @{$arv_repo_list->{'items'}};
559     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
560     if ($n_found > 0) {
561       Log(undef, "Repository '$repo' -> "
562           . join(", ", map { $_->{'uuid'} } @repos_found));
563     }
564     if ($n_found != 1) {
565       croak("Error: Found $n_found repositories with name '$repo'.");
566     }
567     $repo = $repos_found[0]->{'fetch_url'};
568     $repo_location = 'remote';
569   }
570   Log(undef, "Using $repo_location repository '$repo'");
571   $ENV{"CRUNCH_SRC_URL"} = $repo;
572
573   # Resolve given script_version (we'll call that $treeish here) to a
574   # commit sha1 ($commit).
575   my $treeish = $Job->{'script_version'};
576   my $commit;
577   if ($repo_location eq 'remote') {
578     # We minimize excess object-fetching by re-using the same bare
579     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
580     # just keep adding remotes to it as needed.
581     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
582     my $gitcmd = "git --git-dir=\Q$local_repo\E";
583
584     # Set up our local repo for caching remote objects, making
585     # archives, etc.
586     if (!-d $local_repo) {
587       make_path($local_repo) or croak("Error: could not create $local_repo");
588     }
589     # This works (exits 0 and doesn't delete fetched objects) even
590     # if $local_repo is already initialized:
591     `$gitcmd init --bare`;
592     if ($?) {
593       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
594     }
595
596     # If $treeish looks like a hash (or abbrev hash) we look it up in
597     # our local cache first, since that's cheaper. (We don't want to
598     # do that with tags/branches though -- those change over time, so
599     # they should always be resolved by the remote repo.)
600     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
601       # Hide stderr because it's normal for this to fail:
602       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
603       if ($? == 0 &&
604           # Careful not to resolve a branch named abcdeff to commit 1234567:
605           $sha1 =~ /^$treeish/ &&
606           $sha1 =~ /^([0-9a-f]{40})$/s) {
607         $commit = $1;
608         Log(undef, "Commit $commit already present in $local_repo");
609       }
610     }
611
612     if (!defined $commit) {
613       # If $treeish isn't just a hash or abbrev hash, or isn't here
614       # yet, we need to fetch the remote to resolve it correctly.
615
616       # First, remove all local heads. This prevents a name that does
617       # not exist on the remote from resolving to (or colliding with)
618       # a previously fetched branch or tag (possibly from a different
619       # remote).
620       remove_tree("$local_repo/refs/heads", {keep_root => 1});
621
622       Log(undef, "Fetching objects from $repo to $local_repo");
623       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
624       if ($?) {
625         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
626       }
627     }
628
629     # Now that the data is all here, we will use our local repo for
630     # the rest of our git activities.
631     $repo = $local_repo;
632   }
633
634   my $gitcmd = "git --git-dir=\Q$repo\E";
635   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
636   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
637     croak("`$gitcmd rev-list` exited "
638           .exit_status_s($?)
639           .", '$treeish' not found, giving up");
640   }
641   $commit = $1;
642   Log(undef, "Version $treeish is commit $commit");
643
644   if ($commit ne $Job->{'script_version'}) {
645     # Record the real commit id in the database, frozentokey, logs,
646     # etc. -- instead of an abbreviation or a branch name which can
647     # become ambiguous or point to a different commit in the future.
648     if (!$Job->update_attributes('script_version' => $commit)) {
649       croak("Error: failed to update job's script_version attribute");
650     }
651   }
652
653   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
654   add_git_archive("$gitcmd archive ''\Q$commit\E");
655 }
656
657 my $git_archive = combined_git_archive();
658 if (!defined $git_archive) {
659   Log(undef, "Skip install phase (no git archive)");
660   if ($have_slurm) {
661     Log(undef, "Warning: This probably means workers have no source tree!");
662   }
663 }
664 else {
665   my $exited;
666   my $install_script_tries_left = 3;
667   for (my $attempts = 0; $attempts < 3; $attempts++) {
668     my @srunargs = ("srun",
669                     "--nodelist=$nodelist",
670                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
671     my @execargs = ("sh", "-c",
672                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
673
674     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
675     my ($stdout, $stderr);
676     ($exited, $stdout, $stderr) = srun_sync(
677       \@srunargs, \@execargs,
678       {label => "run install script on all workers"},
679       $build_script . $git_archive);
680
681     my $stderr_anything_from_script = 0;
682     for my $line (split(/\n/, $stderr)) {
683       if ($line !~ /^(srun: error: |starting: \[)/) {
684         $stderr_anything_from_script = 1;
685       }
686     }
687
688     last if $exited == 0 || $main::please_freeze;
689
690     # If the install script fails but doesn't print an error message,
691     # the next thing anyone is likely to do is just run it again in
692     # case it was a transient problem like "slurm communication fails
693     # because the network isn't reliable enough". So we'll just do
694     # that ourselves (up to 3 attempts in total). OTOH, if there is an
695     # error message, the problem is more likely to have a real fix and
696     # we should fail the job so the fixing process can start, instead
697     # of doing 2 more attempts.
698     last if $stderr_anything_from_script;
699   }
700
701   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
702     unlink($tar_filename);
703   }
704
705   if ($exited != 0) {
706     croak("Giving up");
707   }
708 }
709
710 foreach (qw (script script_version script_parameters runtime_constraints))
711 {
712   Log (undef,
713        "$_ " .
714        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
715 }
716 foreach (split (/\n/, $Job->{knobs}))
717 {
718   Log (undef, "knob " . $_);
719 }
720 my $resp = api_call(
721   'nodes/list',
722   'filters' => [['hostname', 'in', \@node]],
723   'order' => 'hostname',
724   'limit' => scalar(@node),
725     );
726 for my $n (@{$resp->{items}}) {
727   Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
728 }
729
730
731
732 $main::success = undef;
733
734
735
736 ONELEVEL:
737
738 my $thisround_succeeded = 0;
739 my $thisround_failed = 0;
740 my $thisround_failed_multiple = 0;
741 my $working_slot_count = scalar(@slot);
742
743 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
744                        or $a <=> $b } @jobstep_todo;
745 my $level = $jobstep[$jobstep_todo[0]]->{level};
746
747 my $initial_tasks_this_level = 0;
748 foreach my $id (@jobstep_todo) {
749   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
750 }
751
752 # If the number of tasks scheduled at this level #T is smaller than the number
753 # of slots available #S, only use the first #T slots, or the first slot on
754 # each node, whichever number is greater.
755 #
756 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
757 # based on these numbers.  Using fewer slots makes more resources available
758 # to each individual task, which should normally be a better strategy when
759 # there are fewer of them running with less parallelism.
760 #
761 # Note that this calculation is not redone if the initial tasks at
762 # this level queue more tasks at the same level.  This may harm
763 # overall task throughput for that level.
764 my @freeslot;
765 if ($initial_tasks_this_level < @node) {
766   @freeslot = (0..$#node);
767 } elsif ($initial_tasks_this_level < @slot) {
768   @freeslot = (0..$initial_tasks_this_level - 1);
769 } else {
770   @freeslot = (0..$#slot);
771 }
772 my $round_num_freeslots = scalar(@freeslot);
773 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
774
775 my %round_max_slots = ();
776 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
777   my $this_slot = $slot[$freeslot[$ii]];
778   my $node_name = $this_slot->{node}->{name};
779   $round_max_slots{$node_name} ||= $this_slot->{cpu};
780   last if (scalar(keys(%round_max_slots)) >= @node);
781 }
782
783 Log(undef, "start level $level with $round_num_freeslots slots");
784 my @holdslot;
785 my %reader;
786 my $progress_is_dirty = 1;
787 my $progress_stats_updated = 0;
788
789 update_progress_stats();
790
791
792 THISROUND:
793 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
794 {
795   # Don't create new tasks if we already know the job's final result.
796   last if defined($main::success);
797
798   my $id = $jobstep_todo[$todo_ptr];
799   my $Jobstep = $jobstep[$id];
800   if ($Jobstep->{level} != $level)
801   {
802     next;
803   }
804
805   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
806   set_nonblocking($reader{$id});
807
808   my $childslot = $freeslot[0];
809   my $childnode = $slot[$childslot]->{node};
810   my $childslotname = join (".",
811                             $slot[$childslot]->{node}->{name},
812                             $slot[$childslot]->{cpu});
813
814   my $childpid = fork();
815   if ($childpid == 0)
816   {
817     $SIG{'INT'} = 'DEFAULT';
818     $SIG{'QUIT'} = 'DEFAULT';
819     $SIG{'TERM'} = 'DEFAULT';
820
821     foreach (values (%reader))
822     {
823       close($_);
824     }
825     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
826     open(STDOUT,">&writer");
827     open(STDERR,">&writer");
828
829     undef $dbh;
830     undef $sth;
831
832     delete $ENV{"GNUPGHOME"};
833     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
834     $ENV{"TASK_QSEQUENCE"} = $id;
835     $ENV{"TASK_SEQUENCE"} = $level;
836     $ENV{"JOB_SCRIPT"} = $Job->{script};
837     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
838       $param =~ tr/a-z/A-Z/;
839       $ENV{"JOB_PARAMETER_$param"} = $value;
840     }
841     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
842     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
843     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
844     $ENV{"HOME"} = $ENV{"TASK_WORK"};
845     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
846     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
847     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
848
849     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
850
851     $ENV{"GZIP"} = "-n";
852
853     my @srunargs = (
854       "srun",
855       "--nodelist=".$childnode->{name},
856       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
857       "--job-name=$job_id.$id.$$",
858         );
859
860     my $stdbuf = " stdbuf --output=0 --error=0 ";
861
862     my $arv_file_cache = "";
863     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
864       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
865     }
866
867     my $command =
868         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
869         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
870         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
871         # These environment variables get used explicitly later in
872         # $command.  No tool is expected to read these values directly.
873         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
874         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
875         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
876         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
877         .q{&& declare -a VOLUMES=() }
878         .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
879         .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
880         .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
881
882     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
883     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
884     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
885
886     if ($docker_hash)
887     {
888       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
889       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
890       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
891       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
892       # We only set memory limits if Docker lets us limit both memory and swap.
893       # Memory limits alone have been supported longer, but subprocesses tend
894       # to get SIGKILL if they exceed that without any swap limit set.
895       # See #5642 for additional background.
896       if ($docker_limitmem) {
897         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
898       }
899
900       # The source tree and $destdir directory (which we have
901       # installed on the worker host) are available in the container,
902       # under the same path.
903       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
904       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
905
906       # Currently, we make the "by_pdh" directory in arv-mount's mount
907       # point appear at /keep inside the container (instead of using
908       # the same path as the host like we do with CRUNCH_SRC and
909       # CRUNCH_INSTALL). However, crunch scripts and utilities must
910       # not rely on this. They must use $TASK_KEEPMOUNT.
911       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
912       $ENV{TASK_KEEPMOUNT} = "/keep";
913
914       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
915       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
916       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
917
918       # TASK_WORK is almost exactly like a docker data volume: it
919       # starts out empty, is writable, and persists until no
920       # containers use it any more. We don't use --volumes-from to
921       # share it with other containers: it is only accessible to this
922       # task, and it goes away when this task stops.
923       #
924       # However, a docker data volume is writable only by root unless
925       # the mount point already happens to exist in the container with
926       # different permissions. Therefore, we [1] assume /tmp already
927       # exists in the image and is writable by the crunch user; [2]
928       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
929       # writable if they are created by docker while setting up the
930       # other --volumes); and [3] create $TASK_WORK inside the
931       # container using $build_script.
932       $command .= "--volume=/tmp ";
933       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
934       $ENV{"HOME"} = $ENV{"TASK_WORK"};
935       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
936
937       # TODO: Share a single JOB_WORK volume across all task
938       # containers on a given worker node, and delete it when the job
939       # ends (and, in case that doesn't work, when the next job
940       # starts).
941       #
942       # For now, use the same approach as TASK_WORK above.
943       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
944
945       # Bind mount the crunchrunner binary and host TLS certificates file into
946       # the container.
947       $command .= '"${VOLUMES[@]}" ';
948
949       while (my ($env_key, $env_val) = each %ENV)
950       {
951         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
952           $command .= "--env=\Q$env_key=$env_val\E ";
953         }
954       }
955       $command .= "--env=\QHOME=$ENV{HOME}\E ";
956       $command .= "\Q$docker_hash\E ";
957
958       if ($Job->{arvados_sdk_version}) {
959         $command .= $stdbuf;
960         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
961       } else {
962         $command .= "/bin/sh -c \'python -c " .
963             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
964             ">&2 2>/dev/null; " .
965             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
966             "if which stdbuf >/dev/null ; then " .
967             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
968             " else " .
969             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
970             " fi\'";
971       }
972     } else {
973       # Non-docker run
974       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
975       $command .= $stdbuf;
976       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
977     }
978
979     my @execargs = ('bash', '-c', $command);
980     srun (\@srunargs, \@execargs, undef, $build_script);
981     # exec() failed, we assume nothing happened.
982     die "srun() failed on build script\n";
983   }
984   close("writer");
985   if (!defined $childpid)
986   {
987     close $reader{$id};
988     delete $reader{$id};
989     next;
990   }
991   shift @freeslot;
992   $proc{$childpid} = {
993     jobstepidx => $id,
994     time => time,
995     slot => $childslot,
996     jobstepname => "$job_id.$id.$childpid",
997   };
998   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
999   $slot[$childslot]->{pid} = $childpid;
1000
1001   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1002   Log ($id, "child $childpid started on $childslotname");
1003   $Jobstep->{starttime} = time;
1004   $Jobstep->{node} = $childnode->{name};
1005   $Jobstep->{slotindex} = $childslot;
1006   delete $Jobstep->{stderr};
1007   delete $Jobstep->{finishtime};
1008   delete $Jobstep->{tempfail};
1009
1010   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1011   $Jobstep->{'arvados_task'}->save;
1012
1013   splice @jobstep_todo, $todo_ptr, 1;
1014   --$todo_ptr;
1015
1016   $progress_is_dirty = 1;
1017
1018   while (!@freeslot
1019          ||
1020          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1021   {
1022     last THISROUND if $main::please_freeze;
1023     if ($main::please_info)
1024     {
1025       $main::please_info = 0;
1026       freeze();
1027       create_output_collection();
1028       save_meta(1);
1029       update_progress_stats();
1030     }
1031     my $gotsome
1032         = readfrompipes ()
1033         + reapchildren ();
1034     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1035     {
1036       check_refresh_wanted();
1037       check_squeue();
1038       update_progress_stats();
1039     }
1040     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1041     {
1042       update_progress_stats();
1043     }
1044     if (!$gotsome) {
1045       select (undef, undef, undef, 0.1);
1046     }
1047     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1048                                         $_->{node}->{hold_count} < 4 } @slot);
1049     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1050         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1051     {
1052       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1053           .($thisround_failed+$thisround_succeeded)
1054           .") -- giving up on this round";
1055       Log (undef, $message);
1056       last THISROUND;
1057     }
1058
1059     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1060     for (my $i=$#freeslot; $i>=0; $i--) {
1061       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1062         push @holdslot, (splice @freeslot, $i, 1);
1063       }
1064     }
1065     for (my $i=$#holdslot; $i>=0; $i--) {
1066       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1067         push @freeslot, (splice @holdslot, $i, 1);
1068       }
1069     }
1070
1071     # give up if no nodes are succeeding
1072     if ($working_slot_count < 1) {
1073       Log(undef, "Every node has failed -- giving up");
1074       last THISROUND;
1075     }
1076   }
1077 }
1078
1079
1080 push @freeslot, splice @holdslot;
1081 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1082
1083
1084 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1085 while (%proc)
1086 {
1087   if ($main::please_continue) {
1088     $main::please_continue = 0;
1089     goto THISROUND;
1090   }
1091   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1092   readfrompipes ();
1093   if (!reapchildren())
1094   {
1095     check_refresh_wanted();
1096     check_squeue();
1097     update_progress_stats();
1098     select (undef, undef, undef, 0.1);
1099     killem (keys %proc) if $main::please_freeze;
1100   }
1101 }
1102
1103 update_progress_stats();
1104 freeze_if_want_freeze();
1105
1106
1107 if (!defined $main::success)
1108 {
1109   if (!@jobstep_todo) {
1110     $main::success = 1;
1111   } elsif ($working_slot_count < 1) {
1112     save_output_collection();
1113     save_meta();
1114     exit(EX_RETRY_UNLOCKED);
1115   } elsif ($thisround_succeeded == 0 &&
1116            ($thisround_failed == 0 || $thisround_failed > 4)) {
1117     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1118     Log (undef, $message);
1119     $main::success = 0;
1120   }
1121 }
1122
1123 goto ONELEVEL if !defined $main::success;
1124
1125
1126 release_allocation();
1127 freeze();
1128 my $collated_output = save_output_collection();
1129 Log (undef, "finish");
1130
1131 save_meta();
1132
1133 my $final_state;
1134 if ($collated_output && $main::success) {
1135   $final_state = 'Complete';
1136 } else {
1137   $final_state = 'Failed';
1138 }
1139 $Job->update_attributes('state' => $final_state);
1140
1141 exit (($final_state eq 'Complete') ? 0 : 1);
1142
1143
1144
1145 sub update_progress_stats
1146 {
1147   $progress_stats_updated = time;
1148   return if !$progress_is_dirty;
1149   my ($todo, $done, $running) = (scalar @jobstep_todo,
1150                                  scalar @jobstep_done,
1151                                  scalar keys(%proc));
1152   $Job->{'tasks_summary'} ||= {};
1153   $Job->{'tasks_summary'}->{'todo'} = $todo;
1154   $Job->{'tasks_summary'}->{'done'} = $done;
1155   $Job->{'tasks_summary'}->{'running'} = $running;
1156   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1157   Log (undef, "status: $done done, $running running, $todo todo");
1158   $progress_is_dirty = 0;
1159 }
1160
1161
1162
1163 sub reapchildren
1164 {
1165   my $children_reaped = 0;
1166   my @successful_task_uuids = ();
1167
1168   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1169   {
1170     my $childstatus = $?;
1171
1172     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1173                     . "."
1174                     . $slot[$proc{$pid}->{slot}]->{cpu});
1175     my $jobstepidx = $proc{$pid}->{jobstepidx};
1176
1177     $children_reaped++;
1178     my $elapsed = time - $proc{$pid}->{time};
1179     my $Jobstep = $jobstep[$jobstepidx];
1180
1181     my $exitvalue = $childstatus >> 8;
1182     my $exitinfo = "exit ".exit_status_s($childstatus);
1183     $Jobstep->{'arvados_task'}->reload;
1184     my $task_success = $Jobstep->{'arvados_task'}->{success};
1185
1186     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1187
1188     if (!defined $task_success) {
1189       # task did not indicate one way or the other --> fail
1190       Log($jobstepidx, sprintf(
1191             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1192             exit_status_s($childstatus)));
1193       $Jobstep->{'arvados_task'}->{success} = 0;
1194       $Jobstep->{'arvados_task'}->save;
1195       $task_success = 0;
1196     }
1197
1198     if (!$task_success)
1199     {
1200       my $temporary_fail;
1201       $temporary_fail ||= $Jobstep->{tempfail};
1202       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1203
1204       ++$thisround_failed;
1205       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1206
1207       # Check for signs of a failed or misconfigured node
1208       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1209           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1210         # Don't count this against jobstep failure thresholds if this
1211         # node is already suspected faulty and srun exited quickly
1212         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1213             $elapsed < 5) {
1214           Log ($jobstepidx, "blaming failure on suspect node " .
1215                $slot[$proc{$pid}->{slot}]->{node}->{name});
1216           $temporary_fail ||= 1;
1217         }
1218         ban_node_by_slot($proc{$pid}->{slot});
1219       }
1220
1221       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1222                                 ++$Jobstep->{'failures'},
1223                                 $temporary_fail ? 'temporary' : 'permanent',
1224                                 $elapsed));
1225
1226       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1227         # Give up on this task, and the whole job
1228         $main::success = 0;
1229       }
1230       # Put this task back on the todo queue
1231       push @jobstep_todo, $jobstepidx;
1232       $Job->{'tasks_summary'}->{'failed'}++;
1233     }
1234     else # task_success
1235     {
1236       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1237       ++$thisround_succeeded;
1238       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1239       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1240       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1241       push @jobstep_done, $jobstepidx;
1242       Log ($jobstepidx, "success in $elapsed seconds");
1243     }
1244     $Jobstep->{exitcode} = $childstatus;
1245     $Jobstep->{finishtime} = time;
1246     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1247     $Jobstep->{'arvados_task'}->save;
1248     process_stderr_final ($jobstepidx);
1249     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1250                               length($Jobstep->{'arvados_task'}->{output}),
1251                               $Jobstep->{'arvados_task'}->{output}));
1252
1253     close $reader{$jobstepidx};
1254     delete $reader{$jobstepidx};
1255     delete $slot[$proc{$pid}->{slot}]->{pid};
1256     push @freeslot, $proc{$pid}->{slot};
1257     delete $proc{$pid};
1258
1259     $progress_is_dirty = 1;
1260   }
1261
1262   if (scalar(@successful_task_uuids) > 0)
1263   {
1264     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1265     # Load new tasks
1266     my $newtask_list = [];
1267     my $newtask_results;
1268     do {
1269       $newtask_results = api_call(
1270         "job_tasks/list",
1271         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1272         'order' => 'qsequence',
1273         'offset' => scalar(@$newtask_list),
1274           );
1275       push(@$newtask_list, @{$newtask_results->{items}});
1276     } while (@{$newtask_results->{items}});
1277     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1278     foreach my $arvados_task (@$newtask_list) {
1279       my $jobstep = {
1280         'level' => $arvados_task->{'sequence'},
1281         'failures' => 0,
1282         'arvados_task' => $arvados_task
1283       };
1284       push @jobstep, $jobstep;
1285       push @jobstep_todo, $#jobstep;
1286     }
1287   }
1288
1289   return $children_reaped;
1290 }
1291
1292 sub check_refresh_wanted
1293 {
1294   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1295   if (@stat &&
1296       $stat[9] > $latest_refresh &&
1297       # ...and we have actually locked the job record...
1298       $job_id eq $Job->{'uuid'}) {
1299     $latest_refresh = scalar time;
1300     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1301     for my $attr ('cancelled_at',
1302                   'cancelled_by_user_uuid',
1303                   'cancelled_by_client_uuid',
1304                   'state') {
1305       $Job->{$attr} = $Job2->{$attr};
1306     }
1307     if ($Job->{'state'} ne "Running") {
1308       if ($Job->{'state'} eq "Cancelled") {
1309         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1310       } else {
1311         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1312       }
1313       $main::success = 0;
1314       $main::please_freeze = 1;
1315     }
1316   }
1317 }
1318
1319 sub check_squeue
1320 {
1321   my $last_squeue_check = $squeue_checked;
1322
1323   # Do not call `squeue` or check the kill list more than once every
1324   # 15 seconds.
1325   return if $last_squeue_check > time - 15;
1326   $squeue_checked = time;
1327
1328   # Look for children from which we haven't received stderr data since
1329   # the last squeue check. If no such children exist, all procs are
1330   # alive and there's no need to even look at squeue.
1331   #
1332   # As long as the crunchstat poll interval (10s) is shorter than the
1333   # squeue check interval (15s) this should make the squeue check an
1334   # infrequent event.
1335   my $silent_procs = 0;
1336   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1337   {
1338     if (!exists($js->{stderr_at}))
1339     {
1340       $js->{stderr_at} = 0;
1341     }
1342     if ($js->{stderr_at} < $last_squeue_check)
1343     {
1344       $silent_procs++;
1345     }
1346   }
1347   return if $silent_procs == 0;
1348
1349   # use killem() on procs whose killtime is reached
1350   while (my ($pid, $procinfo) = each %proc)
1351   {
1352     my $js = $jobstep[$procinfo->{jobstepidx}];
1353     if (exists $procinfo->{killtime}
1354         && $procinfo->{killtime} <= time
1355         && $js->{stderr_at} < $last_squeue_check)
1356     {
1357       my $sincewhen = "";
1358       if ($js->{stderr_at}) {
1359         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1360       }
1361       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1362       killem ($pid);
1363     }
1364   }
1365
1366   if (!$have_slurm)
1367   {
1368     # here is an opportunity to check for mysterious problems with local procs
1369     return;
1370   }
1371
1372   # Get a list of steps still running.  Note: squeue(1) says --steps
1373   # selects a format (which we override anyway) and allows us to
1374   # specify which steps we're interested in (which we don't).
1375   # Importantly, it also changes the meaning of %j from "job name" to
1376   # "step name" and (although this isn't mentioned explicitly in the
1377   # docs) switches from "one line per job" mode to "one line per step"
1378   # mode. Without it, we'd just get a list of one job, instead of a
1379   # list of N steps.
1380   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1381   if ($? != 0)
1382   {
1383     Log(undef, "warning: squeue exit status $? ($!)");
1384     return;
1385   }
1386   chop @squeue;
1387
1388   # which of my jobsteps are running, according to squeue?
1389   my %ok;
1390   for my $jobstepname (@squeue)
1391   {
1392     $ok{$jobstepname} = 1;
1393   }
1394
1395   # Check for child procs >60s old and not mentioned by squeue.
1396   while (my ($pid, $procinfo) = each %proc)
1397   {
1398     if ($procinfo->{time} < time - 60
1399         && $procinfo->{jobstepname}
1400         && !exists $ok{$procinfo->{jobstepname}}
1401         && !exists $procinfo->{killtime})
1402     {
1403       # According to slurm, this task has ended (successfully or not)
1404       # -- but our srun child hasn't exited. First we must wait (30
1405       # seconds) in case this is just a race between communication
1406       # channels. Then, if our srun child process still hasn't
1407       # terminated, we'll conclude some slurm communication
1408       # error/delay has caused the task to die without notifying srun,
1409       # and we'll kill srun ourselves.
1410       $procinfo->{killtime} = time + 30;
1411       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1412     }
1413   }
1414 }
1415
1416 sub check_sinfo
1417 {
1418   # If a node fails in a multi-node "srun" call during job setup, the call
1419   # may hang instead of exiting with a nonzero code.  This function checks
1420   # "sinfo" for the health of the nodes that were allocated and ensures that
1421   # they are all still in the "alloc" state.  If a node that is allocated to
1422   # this job is not in "alloc" state, then set please_freeze.
1423   #
1424   # This is only called from srun_sync() for node configuration.  If a
1425   # node fails doing actual work, there are other recovery mechanisms.
1426
1427   # Do not call `sinfo` more than once every 15 seconds.
1428   return if $sinfo_checked > time - 15;
1429   $sinfo_checked = time;
1430
1431   # The output format "%t" means output node states.
1432   my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1433   if ($? != 0)
1434   {
1435     Log(undef, "warning: sinfo exit status $? ($!)");
1436     return;
1437   }
1438   chop @sinfo;
1439
1440   foreach (@sinfo)
1441   {
1442     if ($_ != "alloc" && $_ != "alloc*") {
1443       $main::please_freeze = 1;
1444     }
1445   }
1446 }
1447
1448 sub release_allocation
1449 {
1450   if ($have_slurm)
1451   {
1452     Log (undef, "release job allocation");
1453     system "scancel $ENV{SLURM_JOB_ID}";
1454   }
1455 }
1456
1457
1458 sub readfrompipes
1459 {
1460   my $gotsome = 0;
1461   my %fd_job;
1462   my $sel = IO::Select->new();
1463   foreach my $jobstepidx (keys %reader)
1464   {
1465     my $fd = $reader{$jobstepidx};
1466     $sel->add($fd);
1467     $fd_job{$fd} = $jobstepidx;
1468
1469     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1470       $sel->add($stdout_fd);
1471       $fd_job{$stdout_fd} = $jobstepidx;
1472     }
1473   }
1474   # select on all reader fds with 0.1s timeout
1475   my @ready_fds = $sel->can_read(0.1);
1476   foreach my $fd (@ready_fds)
1477   {
1478     my $buf;
1479     if (0 < sysread ($fd, $buf, 65536))
1480     {
1481       $gotsome = 1;
1482       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1483
1484       my $jobstepidx = $fd_job{$fd};
1485       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1486         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1487         next;
1488       }
1489
1490       $jobstep[$jobstepidx]->{stderr_at} = time;
1491       $jobstep[$jobstepidx]->{stderr} .= $buf;
1492
1493       # Consume everything up to the last \n
1494       preprocess_stderr ($jobstepidx);
1495
1496       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1497       {
1498         # If we get a lot of stderr without a newline, chop off the
1499         # front to avoid letting our buffer grow indefinitely.
1500         substr ($jobstep[$jobstepidx]->{stderr},
1501                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1502       }
1503     }
1504   }
1505   return $gotsome;
1506 }
1507
1508
1509 # Consume all full lines of stderr for a jobstep. Everything after the
1510 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1511 # returning.
1512 sub preprocess_stderr
1513 {
1514   my $jobstepidx = shift;
1515   # slotindex is only defined for children running Arvados job tasks.
1516   # Be prepared to handle the undef case (for setup srun calls, etc.).
1517   my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1518
1519   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1520     my $line = $1;
1521     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1522     Log ($jobstepidx, "stderr $line");
1523     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1524       # If the allocation is revoked, we can't possibly continue, so mark all
1525       # nodes as failed.  This will cause the overall exit code to be
1526       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1527       # this job.
1528       $main::please_freeze = 1;
1529       foreach my $st (@slot) {
1530         $st->{node}->{fail_count}++;
1531       }
1532     }
1533     elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
1534       $jobstep[$jobstepidx]->{tempfail} = 1;
1535       if (defined($job_slot_index)) {
1536         $slot[$job_slot_index]->{node}->{fail_count}++;
1537         ban_node_by_slot($job_slot_index);
1538       }
1539     }
1540     elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1541       $jobstep[$jobstepidx]->{tempfail} = 1;
1542       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1543     }
1544     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1545       $jobstep[$jobstepidx]->{tempfail} = 1;
1546     }
1547   }
1548 }
1549
1550
1551 sub process_stderr_final
1552 {
1553   my $jobstepidx = shift;
1554   preprocess_stderr ($jobstepidx);
1555
1556   map {
1557     Log ($jobstepidx, "stderr $_");
1558   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1559   $jobstep[$jobstepidx]->{stderr} = '';
1560 }
1561
1562 sub fetch_block
1563 {
1564   my $hash = shift;
1565   my $keep;
1566   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1567     Log(undef, "fetch_block run error from arv-get $hash: $!");
1568     return undef;
1569   }
1570   my $output_block = "";
1571   while (1) {
1572     my $buf;
1573     my $bytes = sysread($keep, $buf, 1024 * 1024);
1574     if (!defined $bytes) {
1575       Log(undef, "fetch_block read error from arv-get: $!");
1576       $output_block = undef;
1577       last;
1578     } elsif ($bytes == 0) {
1579       # sysread returns 0 at the end of the pipe.
1580       last;
1581     } else {
1582       # some bytes were read into buf.
1583       $output_block .= $buf;
1584     }
1585   }
1586   close $keep;
1587   if ($?) {
1588     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1589     $output_block = undef;
1590   }
1591   return $output_block;
1592 }
1593
1594 # Create a collection by concatenating the output of all tasks (each
1595 # task's output is either a manifest fragment, a locator for a
1596 # manifest fragment stored in Keep, or nothing at all). Return the
1597 # portable_data_hash of the new collection.
1598 sub create_output_collection
1599 {
1600   Log (undef, "collate");
1601
1602   my ($child_out, $child_in);
1603   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1604 import arvados
1605 import sys
1606 print (arvados.api("v1").collections().
1607        create(body={"manifest_text": sys.stdin.read(),
1608                     "owner_uuid": sys.argv[2]}).
1609        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1610 }, retry_count(), $Job->{owner_uuid});
1611
1612   my $task_idx = -1;
1613   my $manifest_size = 0;
1614   for (@jobstep)
1615   {
1616     ++$task_idx;
1617     my $output = $_->{'arvados_task'}->{output};
1618     next if (!defined($output));
1619     my $next_write;
1620     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1621       $next_write = fetch_block($output);
1622     } else {
1623       $next_write = $output;
1624     }
1625     if (defined($next_write)) {
1626       if (!defined(syswrite($child_in, $next_write))) {
1627         # There's been an error writing.  Stop the loop.
1628         # We'll log details about the exit code later.
1629         last;
1630       } else {
1631         $manifest_size += length($next_write);
1632       }
1633     } else {
1634       my $uuid = $_->{'arvados_task'}->{'uuid'};
1635       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1636       $main::success = 0;
1637     }
1638   }
1639   close($child_in);
1640   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1641
1642   my $joboutput;
1643   my $s = IO::Select->new($child_out);
1644   if ($s->can_read(120)) {
1645     sysread($child_out, $joboutput, 1024 * 1024);
1646     waitpid($pid, 0);
1647     if ($?) {
1648       Log(undef, "output collection creation exited " . exit_status_s($?));
1649       $joboutput = undef;
1650     } else {
1651       chomp($joboutput);
1652     }
1653   } else {
1654     Log (undef, "timed out while creating output collection");
1655     foreach my $signal (2, 2, 2, 15, 15, 9) {
1656       kill($signal, $pid);
1657       last if waitpid($pid, WNOHANG) == -1;
1658       sleep(1);
1659     }
1660   }
1661   close($child_out);
1662
1663   return $joboutput;
1664 }
1665
1666 # Calls create_output_collection, logs the result, and returns it.
1667 # If that was successful, save that as the output in the job record.
1668 sub save_output_collection {
1669   my $collated_output = create_output_collection();
1670
1671   if (!$collated_output) {
1672     Log(undef, "Failed to write output collection");
1673   }
1674   else {
1675     Log(undef, "job output $collated_output");
1676     $Job->update_attributes('output' => $collated_output);
1677   }
1678   return $collated_output;
1679 }
1680
1681 sub killem
1682 {
1683   foreach (@_)
1684   {
1685     my $sig = 2;                # SIGINT first
1686     if (exists $proc{$_}->{"sent_$sig"} &&
1687         time - $proc{$_}->{"sent_$sig"} > 4)
1688     {
1689       $sig = 15;                # SIGTERM if SIGINT doesn't work
1690     }
1691     if (exists $proc{$_}->{"sent_$sig"} &&
1692         time - $proc{$_}->{"sent_$sig"} > 4)
1693     {
1694       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1695     }
1696     if (!exists $proc{$_}->{"sent_$sig"})
1697     {
1698       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1699       kill $sig, $_;
1700       select (undef, undef, undef, 0.1);
1701       if ($sig == 2)
1702       {
1703         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1704       }
1705       $proc{$_}->{"sent_$sig"} = time;
1706       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1707     }
1708   }
1709 }
1710
1711
1712 sub fhbits
1713 {
1714   my($bits);
1715   for (@_) {
1716     vec($bits,fileno($_),1) = 1;
1717   }
1718   $bits;
1719 }
1720
1721
1722 # Send log output to Keep via arv-put.
1723 #
1724 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1725 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1726 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1727 # $log_pipe_pid is the pid of the arv-put subprocess.
1728 #
1729 # The only functions that should access these variables directly are:
1730 #
1731 # log_writer_start($logfilename)
1732 #     Starts an arv-put pipe, reading data on stdin and writing it to
1733 #     a $logfilename file in an output collection.
1734 #
1735 # log_writer_read_output([$timeout])
1736 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1737 #     Passes $timeout to the select() call, with a default of 0.01.
1738 #     Returns the result of the last read() call on $log_pipe_out, or
1739 #     -1 if read() wasn't called because select() timed out.
1740 #     Only other log_writer_* functions should need to call this.
1741 #
1742 # log_writer_send($txt)
1743 #     Writes $txt to the output log collection.
1744 #
1745 # log_writer_finish()
1746 #     Closes the arv-put pipe and returns the output that it produces.
1747 #
1748 # log_writer_is_active()
1749 #     Returns a true value if there is currently a live arv-put
1750 #     process, false otherwise.
1751 #
1752 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1753     $log_pipe_pid);
1754
1755 sub log_writer_start($)
1756 {
1757   my $logfilename = shift;
1758   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1759                         'arv-put',
1760                         '--stream',
1761                         '--retries', '3',
1762                         '--filename', $logfilename,
1763                         '-');
1764   $log_pipe_out_buf = "";
1765   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1766 }
1767
1768 sub log_writer_read_output {
1769   my $timeout = shift || 0.01;
1770   my $read = -1;
1771   while ($read && $log_pipe_out_select->can_read($timeout)) {
1772     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1773                  length($log_pipe_out_buf));
1774   }
1775   if (!defined($read)) {
1776     Log(undef, "error reading log manifest from arv-put: $!");
1777   }
1778   return $read;
1779 }
1780
1781 sub log_writer_send($)
1782 {
1783   my $txt = shift;
1784   print $log_pipe_in $txt;
1785   log_writer_read_output();
1786 }
1787
1788 sub log_writer_finish()
1789 {
1790   return unless $log_pipe_pid;
1791
1792   close($log_pipe_in);
1793
1794   my $logger_failed = 0;
1795   my $read_result = log_writer_read_output(600);
1796   if ($read_result == -1) {
1797     $logger_failed = -1;
1798     Log (undef, "timed out reading from 'arv-put'");
1799   } elsif ($read_result != 0) {
1800     $logger_failed = -2;
1801     Log(undef, "failed to read arv-put log manifest to EOF");
1802   }
1803
1804   waitpid($log_pipe_pid, 0);
1805   if ($?) {
1806     $logger_failed ||= $?;
1807     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1808   }
1809
1810   close($log_pipe_out);
1811   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1812   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1813       $log_pipe_out_select = undef;
1814
1815   return $arv_put_output;
1816 }
1817
1818 sub log_writer_is_active() {
1819   return $log_pipe_pid;
1820 }
1821
1822 sub Log                         # ($jobstepidx, $logmessage)
1823 {
1824   my ($jobstepidx, $logmessage) = @_;
1825   if ($logmessage =~ /\n/) {
1826     for my $line (split (/\n/, $_[1])) {
1827       Log ($jobstepidx, $line);
1828     }
1829     return;
1830   }
1831   my $fh = select STDERR; $|=1; select $fh;
1832   my $task_qseq = '';
1833   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1834     $task_qseq = $jobstepidx;
1835   }
1836   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1837   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1838   $message .= "\n";
1839   my $datetime;
1840   if (log_writer_is_active() || -t STDERR) {
1841     my @gmtime = gmtime;
1842     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1843                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1844   }
1845   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1846
1847   if (log_writer_is_active()) {
1848     log_writer_send($datetime . " " . $message);
1849   }
1850 }
1851
1852
1853 sub croak
1854 {
1855   my ($package, $file, $line) = caller;
1856   my $message = "@_ at $file line $line\n";
1857   Log (undef, $message);
1858   release_allocation();
1859   freeze() if @jobstep_todo;
1860   create_output_collection() if @jobstep_todo;
1861   cleanup();
1862   save_meta();
1863   die;
1864 }
1865
1866
1867 sub cleanup
1868 {
1869   return unless $Job;
1870   if ($Job->{'state'} eq 'Cancelled') {
1871     $Job->update_attributes('finished_at' => scalar gmtime);
1872   } else {
1873     $Job->update_attributes('state' => 'Failed');
1874   }
1875 }
1876
1877
1878 sub save_meta
1879 {
1880   my $justcheckpoint = shift; # false if this will be the last meta saved
1881   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1882   return unless log_writer_is_active();
1883   my $log_manifest = log_writer_finish();
1884   return unless defined($log_manifest);
1885
1886   if ($Job->{log}) {
1887     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1888     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1889   }
1890
1891   my $log_coll = api_call(
1892     "collections/create", ensure_unique_name => 1, collection => {
1893       manifest_text => $log_manifest,
1894       owner_uuid => $Job->{owner_uuid},
1895       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1896     });
1897   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1898   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1899 }
1900
1901
1902 sub freeze_if_want_freeze
1903 {
1904   if ($main::please_freeze)
1905   {
1906     release_allocation();
1907     if (@_)
1908     {
1909       # kill some srun procs before freeze+stop
1910       map { $proc{$_} = {} } @_;
1911       while (%proc)
1912       {
1913         killem (keys %proc);
1914         select (undef, undef, undef, 0.1);
1915         my $died;
1916         while (($died = waitpid (-1, WNOHANG)) > 0)
1917         {
1918           delete $proc{$died};
1919         }
1920       }
1921     }
1922     freeze();
1923     create_output_collection();
1924     cleanup();
1925     save_meta();
1926     exit 1;
1927   }
1928 }
1929
1930
1931 sub freeze
1932 {
1933   Log (undef, "Freeze not implemented");
1934   return;
1935 }
1936
1937
1938 sub thaw
1939 {
1940   croak ("Thaw not implemented");
1941 }
1942
1943
1944 sub freezequote
1945 {
1946   my $s = shift;
1947   $s =~ s/\\/\\\\/g;
1948   $s =~ s/\n/\\n/g;
1949   return $s;
1950 }
1951
1952
1953 sub freezeunquote
1954 {
1955   my $s = shift;
1956   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1957   return $s;
1958 }
1959
1960 sub srun_sync
1961 {
1962   my $srunargs = shift;
1963   my $execargs = shift;
1964   my $opts = shift || {};
1965   my $stdin = shift;
1966
1967   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1968   Log (undef, "$label: start");
1969
1970   my ($stderr_r, $stderr_w);
1971   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1972
1973   my ($stdout_r, $stdout_w);
1974   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1975
1976   my $srunpid = fork();
1977   if ($srunpid == 0)
1978   {
1979     close($stderr_r);
1980     close($stdout_r);
1981     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1982     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1983     open(STDERR, ">&", $stderr_w);
1984     open(STDOUT, ">&", $stdout_w);
1985     srun ($srunargs, $execargs, $opts, $stdin);
1986     exit (1);
1987   }
1988   close($stderr_w);
1989   close($stdout_w);
1990
1991   set_nonblocking($stderr_r);
1992   set_nonblocking($stdout_r);
1993
1994   # Add entries to @jobstep and %proc so check_squeue() and
1995   # freeze_if_want_freeze() can treat it like a job task process.
1996   push @jobstep, {
1997     stderr => '',
1998     stderr_at => 0,
1999     stderr_captured => '',
2000     stdout_r => $stdout_r,
2001     stdout_captured => '',
2002   };
2003   my $jobstepidx = $#jobstep;
2004   $proc{$srunpid} = {
2005     jobstepidx => $jobstepidx,
2006   };
2007   $reader{$jobstepidx} = $stderr_r;
2008
2009   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2010     my $busy = readfrompipes();
2011     if (!$busy || ($latest_refresh + 2 < scalar time)) {
2012       check_refresh_wanted();
2013       check_squeue();
2014       check_sinfo();
2015     }
2016     if (!$busy) {
2017       select(undef, undef, undef, 0.1);
2018     }
2019     killem(keys %proc) if $main::please_freeze;
2020   }
2021   my $exited = $?;
2022
2023   1 while readfrompipes();
2024   process_stderr_final ($jobstepidx);
2025
2026   Log (undef, "$label: exit ".exit_status_s($exited));
2027
2028   close($stdout_r);
2029   close($stderr_r);
2030   delete $proc{$srunpid};
2031   delete $reader{$jobstepidx};
2032
2033   my $j = pop @jobstep;
2034   # If the srun showed signs of tempfail, ensure the caller treats that as a
2035   # failure case.
2036   if ($main::please_freeze || $j->{tempfail}) {
2037     $exited ||= 255;
2038   }
2039   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
2040 }
2041
2042
2043 sub srun
2044 {
2045   my $srunargs = shift;
2046   my $execargs = shift;
2047   my $opts = shift || {};
2048   my $stdin = shift;
2049   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2050
2051   $Data::Dumper::Terse = 1;
2052   $Data::Dumper::Indent = 0;
2053   my $show_cmd = Dumper($args);
2054   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2055   $show_cmd =~ s/\n/ /g;
2056   if ($opts->{fork}) {
2057     Log(undef, "starting: $show_cmd");
2058   } else {
2059     # This is a child process: parent is in charge of reading our
2060     # stderr and copying it to Log() if needed.
2061     warn "starting: $show_cmd\n";
2062   }
2063
2064   if (defined $stdin) {
2065     my $child = open STDIN, "-|";
2066     defined $child or die "no fork: $!";
2067     if ($child == 0) {
2068       print $stdin or die $!;
2069       close STDOUT or die $!;
2070       exit 0;
2071     }
2072   }
2073
2074   return system (@$args) if $opts->{fork};
2075
2076   exec @$args;
2077   warn "ENV size is ".length(join(" ",%ENV));
2078   die "exec failed: $!: @$args";
2079 }
2080
2081
2082 sub ban_node_by_slot {
2083   # Don't start any new jobsteps on this node for 60 seconds
2084   my $slotid = shift;
2085   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2086   $slot[$slotid]->{node}->{hold_count}++;
2087   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2088 }
2089
2090 sub must_lock_now
2091 {
2092   my ($lockfile, $error_message) = @_;
2093   open L, ">", $lockfile or croak("$lockfile: $!");
2094   if (!flock L, LOCK_EX|LOCK_NB) {
2095     croak("Can't lock $lockfile: $error_message\n");
2096   }
2097 }
2098
2099 sub find_docker_image {
2100   # Given a Keep locator, check to see if it contains a Docker image.
2101   # If so, return its stream name and Docker hash.
2102   # If not, return undef for both values.
2103   my $locator = shift;
2104   my ($streamname, $filename);
2105   my $image = api_call("collections/get", uuid => $locator);
2106   if ($image) {
2107     foreach my $line (split(/\n/, $image->{manifest_text})) {
2108       my @tokens = split(/\s+/, $line);
2109       next if (!@tokens);
2110       $streamname = shift(@tokens);
2111       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2112         if (defined($filename)) {
2113           return (undef, undef);  # More than one file in the Collection.
2114         } else {
2115           $filename = (split(/:/, $filedata, 3))[2];
2116         }
2117       }
2118     }
2119   }
2120   if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2121     return ($streamname, $1);
2122   } else {
2123     return (undef, undef);
2124   }
2125 }
2126
2127 sub retry_count {
2128   # Calculate the number of times an operation should be retried,
2129   # assuming exponential backoff, and that we're willing to retry as
2130   # long as tasks have been running.  Enforce a minimum of 3 retries.
2131   my ($starttime, $endtime, $timediff, $retries);
2132   if (@jobstep) {
2133     $starttime = $jobstep[0]->{starttime};
2134     $endtime = $jobstep[-1]->{finishtime};
2135   }
2136   if (!defined($starttime)) {
2137     $timediff = 0;
2138   } elsif (!defined($endtime)) {
2139     $timediff = time - $starttime;
2140   } else {
2141     $timediff = ($endtime - $starttime) - (time - $endtime);
2142   }
2143   if ($timediff > 0) {
2144     $retries = int(log($timediff) / log(2));
2145   } else {
2146     $retries = 1;  # Use the minimum.
2147   }
2148   return ($retries > 3) ? $retries : 3;
2149 }
2150
2151 sub retry_op {
2152   # Pass in two function references.
2153   # This method will be called with the remaining arguments.
2154   # If it dies, retry it with exponential backoff until it succeeds,
2155   # or until the current retry_count is exhausted.  After each failure
2156   # that can be retried, the second function will be called with
2157   # the current try count (0-based), next try time, and error message.
2158   my $operation = shift;
2159   my $retry_callback = shift;
2160   my $retries = retry_count();
2161   foreach my $try_count (0..$retries) {
2162     my $next_try = time + (2 ** $try_count);
2163     my $result = eval { $operation->(@_); };
2164     if (!$@) {
2165       return $result;
2166     } elsif ($try_count < $retries) {
2167       $retry_callback->($try_count, $next_try, $@);
2168       my $sleep_time = $next_try - time;
2169       sleep($sleep_time) if ($sleep_time > 0);
2170     }
2171   }
2172   # Ensure the error message ends in a newline, so Perl doesn't add
2173   # retry_op's line number to it.
2174   chomp($@);
2175   die($@ . "\n");
2176 }
2177
2178 sub api_call {
2179   # Pass in a /-separated API method name, and arguments for it.
2180   # This function will call that method, retrying as needed until
2181   # the current retry_count is exhausted, with a log on the first failure.
2182   my $method_name = shift;
2183   my $log_api_retry = sub {
2184     my ($try_count, $next_try_at, $errmsg) = @_;
2185     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2186     $errmsg =~ s/\s/ /g;
2187     $errmsg =~ s/\s+$//;
2188     my $retry_msg;
2189     if ($next_try_at < time) {
2190       $retry_msg = "Retrying.";
2191     } else {
2192       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2193       $retry_msg = "Retrying at $next_try_fmt.";
2194     }
2195     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2196   };
2197   my $method = $arv;
2198   foreach my $key (split(/\//, $method_name)) {
2199     $method = $method->{$key};
2200   }
2201   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2202 }
2203
2204 sub exit_status_s {
2205   # Given a $?, return a human-readable exit code string like "0" or
2206   # "1" or "0 with signal 1" or "1 with signal 11".
2207   my $exitcode = shift;
2208   my $s = $exitcode >> 8;
2209   if ($exitcode & 0x7f) {
2210     $s .= " with signal " . ($exitcode & 0x7f);
2211   }
2212   if ($exitcode & 0x80) {
2213     $s .= " with core dump";
2214   }
2215   return $s;
2216 }
2217
2218 sub handle_readall {
2219   # Pass in a glob reference to a file handle.
2220   # Read all its contents and return them as a string.
2221   my $fh_glob_ref = shift;
2222   local $/ = undef;
2223   return <$fh_glob_ref>;
2224 }
2225
2226 sub tar_filename_n {
2227   my $n = shift;
2228   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2229 }
2230
2231 sub add_git_archive {
2232   # Pass in a git archive command as a string or list, a la system().
2233   # This method will save its output to be included in the archive sent to the
2234   # build script.
2235   my $git_input;
2236   $git_tar_count++;
2237   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2238     croak("Failed to save git archive: $!");
2239   }
2240   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2241   close($git_input);
2242   waitpid($git_pid, 0);
2243   close(GIT_ARCHIVE);
2244   if ($?) {
2245     croak("Failed to save git archive: git exited " . exit_status_s($?));
2246   }
2247 }
2248
2249 sub combined_git_archive {
2250   # Combine all saved tar archives into a single archive, then return its
2251   # contents in a string.  Return undef if no archives have been saved.
2252   if ($git_tar_count < 1) {
2253     return undef;
2254   }
2255   my $base_tar_name = tar_filename_n(1);
2256   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2257     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2258     if ($tar_exit != 0) {
2259       croak("Error preparing build archive: tar -A exited " .
2260             exit_status_s($tar_exit));
2261     }
2262   }
2263   if (!open(GIT_TAR, "<", $base_tar_name)) {
2264     croak("Could not open build archive: $!");
2265   }
2266   my $tar_contents = handle_readall(\*GIT_TAR);
2267   close(GIT_TAR);
2268   return $tar_contents;
2269 }
2270
2271 sub set_nonblocking {
2272   my $fh = shift;
2273   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2274   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2275 }
2276
2277 __DATA__
2278 #!/usr/bin/env perl
2279 #
2280 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2281 # server invokes this script on individual compute nodes, or localhost if we're
2282 # running a job locally.  It gets called in two modes:
2283 #
2284 # * No arguments: Installation mode.  Read a tar archive from the DATA
2285 #   file handle; it includes the Crunch script's source code, and
2286 #   maybe SDKs as well.  Those should be installed in the proper
2287 #   locations.  This runs outside of any Docker container, so don't try to
2288 #   introspect Crunch's runtime environment.
2289 #
2290 # * With arguments: Crunch script run mode.  This script should set up the
2291 #   environment, then run the command specified in the arguments.  This runs
2292 #   inside any Docker container.
2293
2294 use Fcntl ':flock';
2295 use File::Path qw( make_path remove_tree );
2296 use POSIX qw(getcwd);
2297
2298 use constant TASK_TEMPFAIL => 111;
2299
2300 # Map SDK subdirectories to the path environments they belong to.
2301 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2302
2303 my $destdir = $ENV{"CRUNCH_SRC"};
2304 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2305 my $repo = $ENV{"CRUNCH_SRC_URL"};
2306 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2307 my $job_work = $ENV{"JOB_WORK"};
2308 my $task_work = $ENV{"TASK_WORK"};
2309
2310 open(STDOUT_ORIG, ">&", STDOUT);
2311 open(STDERR_ORIG, ">&", STDERR);
2312
2313 for my $dir ($destdir, $job_work, $task_work) {
2314   if ($dir) {
2315     make_path $dir;
2316     -e $dir or die "Failed to create temporary directory ($dir): $!";
2317   }
2318 }
2319
2320 if ($task_work) {
2321   remove_tree($task_work, {keep_root => 1});
2322 }
2323
2324 ### Crunch script run mode
2325 if (@ARGV) {
2326   # We want to do routine logging during task 0 only.  This gives the user
2327   # the information they need, but avoids repeating the information for every
2328   # task.
2329   my $Log;
2330   if ($ENV{TASK_SEQUENCE} eq "0") {
2331     $Log = sub {
2332       my $msg = shift;
2333       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2334     };
2335   } else {
2336     $Log = sub { };
2337   }
2338
2339   my $python_src = "$install_dir/python";
2340   my $venv_dir = "$job_work/.arvados.venv";
2341   my $venv_built = -e "$venv_dir/bin/activate";
2342   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2343     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2344                  "--python=python2.7", $venv_dir);
2345     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2346     $venv_built = 1;
2347     $Log->("Built Python SDK virtualenv");
2348   }
2349
2350   my @pysdk_version_cmd = ("python", "-c",
2351     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2352   if ($venv_built) {
2353     $Log->("Running in Python SDK virtualenv");
2354     @pysdk_version_cmd = ();
2355     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2356     @ARGV = ("/bin/sh", "-ec",
2357              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2358   } elsif (-d $python_src) {
2359     $Log->("Warning: virtualenv not found inside Docker container default " .
2360            "\$PATH. Can't install Python SDK.");
2361   }
2362
2363   if (@pysdk_version_cmd) {
2364     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2365     my $pysdk_version = <$pysdk_version_pipe>;
2366     close($pysdk_version_pipe);
2367     if ($? == 0) {
2368       chomp($pysdk_version);
2369       $Log->("Using Arvados SDK version $pysdk_version");
2370     } else {
2371       # A lot could've gone wrong here, but pretty much all of it means that
2372       # Python won't be able to load the Arvados SDK.
2373       $Log->("Warning: Arvados SDK not found");
2374     }
2375   }
2376
2377   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2378     my $sdk_path = "$install_dir/$sdk_dir";
2379     if (-d $sdk_path) {
2380       if ($ENV{$sdk_envkey}) {
2381         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2382       } else {
2383         $ENV{$sdk_envkey} = $sdk_path;
2384       }
2385       $Log->("Arvados SDK added to %s", $sdk_envkey);
2386     }
2387   }
2388
2389   exec(@ARGV);
2390   die "Cannot exec `@ARGV`: $!";
2391 }
2392
2393 ### Installation mode
2394 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2395 flock L, LOCK_EX;
2396 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2397   # This exact git archive (source + arvados sdk) is already installed
2398   # here, so there's no need to reinstall it.
2399
2400   # We must consume our DATA section, though: otherwise the process
2401   # feeding it to us will get SIGPIPE.
2402   my $buf;
2403   while (read(DATA, $buf, 65536)) { }
2404
2405   exit(0);
2406 }
2407
2408 unlink "$destdir.archive_hash";
2409 mkdir $destdir;
2410
2411 do {
2412   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2413   local $SIG{PIPE} = "IGNORE";
2414   warn "Extracting archive: $archive_hash\n";
2415   # --ignore-zeros is necessary sometimes: depending on how much NUL
2416   # padding tar -A put on our combined archive (which in turn depends
2417   # on the length of the component archives) tar without
2418   # --ignore-zeros will exit before consuming stdin and cause close()
2419   # to fail on the resulting SIGPIPE.
2420   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2421     die "Error launching 'tar -xC $destdir': $!";
2422   }
2423   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2424   # get SIGPIPE.  We must feed it data incrementally.
2425   my $tar_input;
2426   while (read(DATA, $tar_input, 65536)) {
2427     print TARX $tar_input;
2428   }
2429   if(!close(TARX)) {
2430     die "'tar -xC $destdir' exited $?: $!";
2431   }
2432 };
2433
2434 mkdir $install_dir;
2435
2436 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2437 if (-d $sdk_root) {
2438   foreach my $sdk_lang (("python",
2439                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2440     if (-d "$sdk_root/$sdk_lang") {
2441       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2442         die "Failed to install $sdk_lang SDK: $!";
2443       }
2444     }
2445   }
2446 }
2447
2448 my $python_dir = "$install_dir/python";
2449 if ((-d $python_dir) and can_run("python2.7")) {
2450   open(my $egg_info_pipe, "-|",
2451        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2452   my @egg_info_errors = <$egg_info_pipe>;
2453   close($egg_info_pipe);
2454
2455   if ($?) {
2456     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2457       # egg_info apparently failed because it couldn't ask git for a build tag.
2458       # Specify no build tag.
2459       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2460       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2461       close($pysdk_cfg);
2462     } else {
2463       my $egg_info_exit = $? >> 8;
2464       foreach my $errline (@egg_info_errors) {
2465         warn $errline;
2466       }
2467       warn "python setup.py egg_info failed: exit $egg_info_exit";
2468       exit ($egg_info_exit || 1);
2469     }
2470   }
2471 }
2472
2473 # Hide messages from the install script (unless it fails: shell_or_die
2474 # will show $destdir.log in that case).
2475 open(STDOUT, ">>", "$destdir.log");
2476 open(STDERR, ">&", STDOUT);
2477
2478 if (-e "$destdir/crunch_scripts/install") {
2479     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2480 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2481     # Old version
2482     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2483 } elsif (-e "./install.sh") {
2484     shell_or_die (undef, "./install.sh", $install_dir);
2485 }
2486
2487 if ($archive_hash) {
2488     unlink "$destdir.archive_hash.new";
2489     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2490     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2491 }
2492
2493 close L;
2494
2495 sub can_run {
2496   my $command_name = shift;
2497   open(my $which, "-|", "which", $command_name);
2498   while (<$which>) { }
2499   close($which);
2500   return ($? == 0);
2501 }
2502
2503 sub shell_or_die
2504 {
2505   my $exitcode = shift;
2506
2507   if ($ENV{"DEBUG"}) {
2508     print STDERR "@_\n";
2509   }
2510   if (system (@_) != 0) {
2511     my $err = $!;
2512     my $code = $?;
2513     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2514     open STDERR, ">&STDERR_ORIG";
2515     system ("cat $destdir.log >&2");
2516     warn "@_ failed ($err): $exitstatus";
2517     if (defined($exitcode)) {
2518       exit $exitcode;
2519     }
2520     else {
2521       exit (($code >> 8) || 1);
2522     }
2523   }
2524 }
2525
2526 __DATA__