2411: Fix crunch-job license statement.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
7
8 =head1 NAME
9
10 crunch-job: Execute job steps, save snapshots as requested, collate output.
11
12 =head1 SYNOPSIS
13
14 Obtain job details from Arvados, run tasks on compute nodes (typically
15 invoked by scheduler on controller):
16
17  crunch-job --job x-y-z --git-dir /path/to/repo/.git
18
19 Obtain job details from command line, run tasks on local machine
20 (typically invoked by application or developer on VM):
21
22  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
23
24  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
25
26 =head1 OPTIONS
27
28 =over
29
30 =item --force-unlock
31
32 If the job is already locked, steal the lock and run it anyway.
33
34 =item --git-dir
35
36 Path to a .git directory (or a git URL) where the commit given in the
37 job's C<script_version> attribute is to be found. If this is I<not>
38 given, the job's C<repository> attribute will be used.
39
40 =item --job-api-token
41
42 Arvados API authorization token to use during the course of the job.
43
44 =item --no-clear-tmp
45
46 Do not clear per-job/task temporary directories during initial job
47 setup. This can speed up development and debugging when running jobs
48 locally.
49
50 =item --job
51
52 UUID of the job to run, or a JSON-encoded job resource without a
53 UUID. If the latter is given, a new job object will be created.
54
55 =back
56
57 =head1 RUNNING JOBS LOCALLY
58
59 crunch-job's log messages appear on stderr along with the job tasks'
60 stderr streams. The log is saved in Keep at each checkpoint and when
61 the job finishes.
62
63 If the job succeeds, the job's output locator is printed on stdout.
64
65 While the job is running, the following signals are accepted:
66
67 =over
68
69 =item control-C, SIGINT, SIGQUIT
70
71 Save a checkpoint, terminate any job tasks that are running, and stop.
72
73 =item SIGALRM
74
75 Save a checkpoint and continue.
76
77 =item SIGHUP
78
79 Refresh node allocation (i.e., check whether any nodes have been added
80 or unallocated) and attributes of the Job record that should affect
81 behavior (e.g., cancel job if cancelled_at becomes non-nil).
82
83 =back
84
85 =cut
86
87
88 use strict;
89 use POSIX ':sys_wait_h';
90 use POSIX qw(strftime);
91 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
92 use Arvados;
93 use Cwd qw(realpath);
94 use Data::Dumper;
95 use Digest::MD5 qw(md5_hex);
96 use Getopt::Long;
97 use IPC::Open2;
98 use IO::Select;
99 use File::Temp;
100 use Fcntl ':flock';
101 use File::Path qw( make_path remove_tree );
102
103 use constant TASK_TEMPFAIL => 111;
104 use constant EX_TEMPFAIL => 75;
105 use constant EX_RETRY_UNLOCKED => 93;
106
107 $ENV{"TMPDIR"} ||= "/tmp";
108 unless (defined $ENV{"CRUNCH_TMP"}) {
109   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
110   if ($ENV{"USER"} ne "crunch" && $< != 0) {
111     # use a tmp dir unique for my uid
112     $ENV{"CRUNCH_TMP"} .= "-$<";
113   }
114 }
115
116 # Create the tmp directory if it does not exist
117 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
118   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
119 }
120
121 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
122 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
123 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
124 mkdir ($ENV{"JOB_WORK"});
125
126 my %proc;
127 my $force_unlock;
128 my $git_dir;
129 my $jobspec;
130 my $job_api_token;
131 my $no_clear_tmp;
132 my $resume_stash;
133 my $cgroup_root = "/sys/fs/cgroup";
134 my $docker_bin = "docker.io";
135 my $docker_run_args = "";
136 GetOptions('force-unlock' => \$force_unlock,
137            'git-dir=s' => \$git_dir,
138            'job=s' => \$jobspec,
139            'job-api-token=s' => \$job_api_token,
140            'no-clear-tmp' => \$no_clear_tmp,
141            'resume-stash=s' => \$resume_stash,
142            'cgroup-root=s' => \$cgroup_root,
143            'docker-bin=s' => \$docker_bin,
144            'docker-run-args=s' => \$docker_run_args,
145     );
146
147 if (defined $job_api_token) {
148   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
149 }
150
151 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
152
153
154 $SIG{'USR1'} = sub
155 {
156   $main::ENV{CRUNCH_DEBUG} = 1;
157 };
158 $SIG{'USR2'} = sub
159 {
160   $main::ENV{CRUNCH_DEBUG} = 0;
161 };
162
163 my $arv = Arvados->new('apiVersion' => 'v1');
164
165 my $Job;
166 my $job_id;
167 my $dbh;
168 my $sth;
169 my @jobstep;
170
171 my $local_job;
172 if ($jobspec =~ /^[-a-z\d]+$/)
173 {
174   # $jobspec is an Arvados UUID, not a JSON job specification
175   $Job = api_call("jobs/get", uuid => $jobspec);
176   $local_job = 0;
177 }
178 else
179 {
180   $local_job = JSON::decode_json($jobspec);
181 }
182
183
184 # Make sure our workers (our slurm nodes, localhost, or whatever) are
185 # at least able to run basic commands: they aren't down or severely
186 # misconfigured.
187 my $cmd = ['true'];
188 if (($Job || $local_job)->{docker_image_locator}) {
189   $cmd = [$docker_bin, 'ps', '-q'];
190 }
191 Log(undef, "Sanity check is `@$cmd`");
192 my ($exited, $stdout, $stderr) = srun_sync(
193   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
194   $cmd,
195   {label => "sanity check"});
196 if ($exited != 0) {
197   Log(undef, "Sanity check failed: ".exit_status_s($exited));
198   exit EX_TEMPFAIL;
199 }
200 Log(undef, "Sanity check OK");
201
202
203 my $User = api_call("users/current");
204
205 if (!$local_job) {
206   if (!$force_unlock) {
207     # Claim this job, and make sure nobody else does
208     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
209     if ($@) {
210       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
211       exit EX_TEMPFAIL;
212     };
213   }
214 }
215 else
216 {
217   if (!$resume_stash)
218   {
219     map { croak ("No $_ specified") unless $local_job->{$_} }
220     qw(script script_version script_parameters);
221   }
222
223   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
224   $local_job->{'started_at'} = gmtime;
225   $local_job->{'state'} = 'Running';
226
227   $Job = api_call("jobs/create", job => $local_job);
228 }
229 $job_id = $Job->{'uuid'};
230
231 my $keep_logfile = $job_id . '.log.txt';
232 log_writer_start($keep_logfile);
233
234 $Job->{'runtime_constraints'} ||= {};
235 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
236 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
237
238 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
239 if ($? == 0) {
240   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
241   chomp($gem_versions);
242   chop($gem_versions);  # Closing parentheses
243 } else {
244   $gem_versions = "";
245 }
246 Log(undef,
247     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
248
249 Log (undef, "check slurm allocation");
250 my @slot;
251 my @node;
252 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
253 my @sinfo;
254 if (!$have_slurm)
255 {
256   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
257   push @sinfo, "$localcpus localhost";
258 }
259 if (exists $ENV{SLURM_NODELIST})
260 {
261   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
262 }
263 foreach (@sinfo)
264 {
265   my ($ncpus, $slurm_nodelist) = split;
266   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
267
268   my @nodelist;
269   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
270   {
271     my $nodelist = $1;
272     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
273     {
274       my $ranges = $1;
275       foreach (split (",", $ranges))
276       {
277         my ($a, $b);
278         if (/(\d+)-(\d+)/)
279         {
280           $a = $1;
281           $b = $2;
282         }
283         else
284         {
285           $a = $_;
286           $b = $_;
287         }
288         push @nodelist, map {
289           my $n = $nodelist;
290           $n =~ s/\[[-,\d]+\]/$_/;
291           $n;
292         } ($a..$b);
293       }
294     }
295     else
296     {
297       push @nodelist, $nodelist;
298     }
299   }
300   foreach my $nodename (@nodelist)
301   {
302     Log (undef, "node $nodename - $ncpus slots");
303     my $node = { name => $nodename,
304                  ncpus => $ncpus,
305                  # The number of consecutive times a task has been dispatched
306                  # to this node and failed.
307                  losing_streak => 0,
308                  # The number of consecutive times that SLURM has reported
309                  # a node failure since the last successful task.
310                  fail_count => 0,
311                  # Don't dispatch work to this node until this time
312                  # (in seconds since the epoch) has passed.
313                  hold_until => 0 };
314     foreach my $cpu (1..$ncpus)
315     {
316       push @slot, { node => $node,
317                     cpu => $cpu };
318     }
319   }
320   push @node, @nodelist;
321 }
322
323
324
325 # Ensure that we get one jobstep running on each allocated node before
326 # we start overloading nodes with concurrent steps
327
328 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
329
330
331 $Job->update_attributes(
332   'tasks_summary' => { 'failed' => 0,
333                        'todo' => 1,
334                        'running' => 0,
335                        'done' => 0 });
336
337 Log (undef, "start");
338 $SIG{'INT'} = sub { $main::please_freeze = 1; };
339 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
340 $SIG{'TERM'} = \&croak;
341 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
342 $SIG{'ALRM'} = sub { $main::please_info = 1; };
343 $SIG{'CONT'} = sub { $main::please_continue = 1; };
344 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
345
346 $main::please_freeze = 0;
347 $main::please_info = 0;
348 $main::please_continue = 0;
349 $main::please_refresh = 0;
350 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
351
352 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
353 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
354 $ENV{"JOB_UUID"} = $job_id;
355
356
357 my @jobstep_todo = ();
358 my @jobstep_done = ();
359 my @jobstep_tomerge = ();
360 my $jobstep_tomerge_level = 0;
361 my $squeue_checked = 0;
362 my $sinfo_checked = 0;
363 my $latest_refresh = scalar time;
364
365
366
367 if (defined $Job->{thawedfromkey})
368 {
369   thaw ($Job->{thawedfromkey});
370 }
371 else
372 {
373   my $first_task = api_call("job_tasks/create", job_task => {
374     'job_uuid' => $Job->{'uuid'},
375     'sequence' => 0,
376     'qsequence' => 0,
377     'parameters' => {},
378   });
379   push @jobstep, { 'level' => 0,
380                    'failures' => 0,
381                    'arvados_task' => $first_task,
382                  };
383   push @jobstep_todo, 0;
384 }
385
386
387 if (!$have_slurm)
388 {
389   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
390 }
391
392 my $build_script = handle_readall(\*DATA);
393 my $nodelist = join(",", @node);
394 my $git_tar_count = 0;
395
396 if (!defined $no_clear_tmp) {
397   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
398   # up work directories crunch_tmp/work, crunch_tmp/opt,
399   # crunch_tmp/src*.
400   my ($exited, $stdout, $stderr) = srun_sync(
401     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
402     ['bash', '-ec', q{
403 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
404 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
405     }],
406     {label => "clean work dirs"});
407   if ($exited != 0) {
408     exit(EX_RETRY_UNLOCKED);
409   }
410 }
411
412 # If this job requires a Docker image, install that.
413 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
414 if ($docker_locator = $Job->{docker_image_locator}) {
415   Log (undef, "Install docker image $docker_locator");
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   Log (undef, "docker image hash is $docker_hash");
422   $docker_stream =~ s/^\.//;
423   my $docker_install_script = qq{
424 loaded() {
425   id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
426   echo "image ID is \$id"
427   [[ \${id} = \Q$docker_hash\E ]]
428 }
429 if loaded >&2 2>/dev/null; then
430   echo >&2 "image is already present"
431   exit 0
432 fi
433 echo >&2 "docker image is not present; loading"
434 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
435 if ! loaded >&2; then
436   echo >&2 "`docker load` exited 0, but image is not found (!)"
437   exit 1
438 fi
439 echo >&2 "image loaded successfully"
440 };
441
442   my ($exited, $stdout, $stderr) = srun_sync(
443     ["srun", "--nodelist=" . join(',', @node)],
444     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
445     {label => "load docker image"});
446   if ($exited != 0)
447   {
448     exit(EX_RETRY_UNLOCKED);
449   }
450
451   # Determine whether this version of Docker supports memory+swap limits.
452   ($exited, $stdout, $stderr) = srun_sync(
453     ["srun", "--nodes=1"],
454     [$docker_bin, 'run', '--help'],
455     {label => "check --memory-swap feature"});
456   $docker_limitmem = ($stdout =~ /--memory-swap/);
457
458   # Find a non-root Docker user to use.
459   # Tries the default user for the container, then 'crunch', then 'nobody',
460   # testing for whether the actual user id is non-zero.  This defends against
461   # mistakes but not malice, but we intend to harden the security in the future
462   # so we don't want anyone getting used to their jobs running as root in their
463   # Docker containers.
464   my @tryusers = ("", "crunch", "nobody");
465   foreach my $try_user (@tryusers) {
466     my $label;
467     my $try_user_arg;
468     if ($try_user eq "") {
469       $label = "check whether default user is UID 0";
470       $try_user_arg = "";
471     } else {
472       $label = "check whether user '$try_user' is UID 0";
473       $try_user_arg = "--user=$try_user";
474     }
475     my ($exited, $stdout, $stderr) = srun_sync(
476       ["srun", "--nodes=1"],
477       ["/bin/sh", "-ec",
478        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
479       {label => $label});
480     chomp($stdout);
481     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
482       $dockeruserarg = $try_user_arg;
483       if ($try_user eq "") {
484         Log(undef, "Container will run with default user");
485       } else {
486         Log(undef, "Container will run with $dockeruserarg");
487       }
488       last;
489     }
490   }
491
492   if (!defined $dockeruserarg) {
493     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
494   }
495
496   if ($Job->{arvados_sdk_version}) {
497     # The job also specifies an Arvados SDK version.  Add the SDKs to the
498     # tar file for the build script to install.
499     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
500                        $Job->{arvados_sdk_version}));
501     add_git_archive("git", "--git-dir=$git_dir", "archive",
502                     "--prefix=.arvados.sdk/",
503                     $Job->{arvados_sdk_version}, "sdk");
504   }
505 }
506
507 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
508   # If script_version looks like an absolute path, *and* the --git-dir
509   # argument was not given -- which implies we were not invoked by
510   # crunch-dispatch -- we will use the given path as a working
511   # directory instead of resolving script_version to a git commit (or
512   # doing anything else with git).
513   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
514   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
515 }
516 else {
517   # Resolve the given script_version to a git commit sha1. Also, if
518   # the repository is remote, clone it into our local filesystem: this
519   # ensures "git archive" will work, and is necessary to reliably
520   # resolve a symbolic script_version like "master^".
521   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
522
523   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
524
525   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
526
527   # If we're running under crunch-dispatch, it will have already
528   # pulled the appropriate source tree into its own repository, and
529   # given us that repo's path as $git_dir.
530   #
531   # If we're running a "local" job, we might have to fetch content
532   # from a remote repository.
533   #
534   # (Currently crunch-dispatch gives a local path with --git-dir, but
535   # we might as well accept URLs there too in case it changes its
536   # mind.)
537   my $repo = $git_dir || $Job->{'repository'};
538
539   # Repository can be remote or local. If remote, we'll need to fetch it
540   # to a local dir before doing `git log` et al.
541   my $repo_location;
542
543   if ($repo =~ m{://|^[^/]*:}) {
544     # $repo is a git url we can clone, like git:// or https:// or
545     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
546     # not recognized here because distinguishing that from a local
547     # path is too fragile. If you really need something strange here,
548     # use the ssh:// form.
549     $repo_location = 'remote';
550   } elsif ($repo =~ m{^\.*/}) {
551     # $repo is a local path to a git index. We'll also resolve ../foo
552     # to ../foo/.git if the latter is a directory. To help
553     # disambiguate local paths from named hosted repositories, this
554     # form must be given as ./ or ../ if it's a relative path.
555     if (-d "$repo/.git") {
556       $repo = "$repo/.git";
557     }
558     $repo_location = 'local';
559   } else {
560     # $repo is none of the above. It must be the name of a hosted
561     # repository.
562     my $arv_repo_list = api_call("repositories/list",
563                                  'filters' => [['name','=',$repo]]);
564     my @repos_found = @{$arv_repo_list->{'items'}};
565     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
566     if ($n_found > 0) {
567       Log(undef, "Repository '$repo' -> "
568           . join(", ", map { $_->{'uuid'} } @repos_found));
569     }
570     if ($n_found != 1) {
571       croak("Error: Found $n_found repositories with name '$repo'.");
572     }
573     $repo = $repos_found[0]->{'fetch_url'};
574     $repo_location = 'remote';
575   }
576   Log(undef, "Using $repo_location repository '$repo'");
577   $ENV{"CRUNCH_SRC_URL"} = $repo;
578
579   # Resolve given script_version (we'll call that $treeish here) to a
580   # commit sha1 ($commit).
581   my $treeish = $Job->{'script_version'};
582   my $commit;
583   if ($repo_location eq 'remote') {
584     # We minimize excess object-fetching by re-using the same bare
585     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
586     # just keep adding remotes to it as needed.
587     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
588     my $gitcmd = "git --git-dir=\Q$local_repo\E";
589
590     # Set up our local repo for caching remote objects, making
591     # archives, etc.
592     if (!-d $local_repo) {
593       make_path($local_repo) or croak("Error: could not create $local_repo");
594     }
595     # This works (exits 0 and doesn't delete fetched objects) even
596     # if $local_repo is already initialized:
597     `$gitcmd init --bare`;
598     if ($?) {
599       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
600     }
601
602     # If $treeish looks like a hash (or abbrev hash) we look it up in
603     # our local cache first, since that's cheaper. (We don't want to
604     # do that with tags/branches though -- those change over time, so
605     # they should always be resolved by the remote repo.)
606     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
607       # Hide stderr because it's normal for this to fail:
608       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
609       if ($? == 0 &&
610           # Careful not to resolve a branch named abcdeff to commit 1234567:
611           $sha1 =~ /^$treeish/ &&
612           $sha1 =~ /^([0-9a-f]{40})$/s) {
613         $commit = $1;
614         Log(undef, "Commit $commit already present in $local_repo");
615       }
616     }
617
618     if (!defined $commit) {
619       # If $treeish isn't just a hash or abbrev hash, or isn't here
620       # yet, we need to fetch the remote to resolve it correctly.
621
622       # First, remove all local heads. This prevents a name that does
623       # not exist on the remote from resolving to (or colliding with)
624       # a previously fetched branch or tag (possibly from a different
625       # remote).
626       remove_tree("$local_repo/refs/heads", {keep_root => 1});
627
628       Log(undef, "Fetching objects from $repo to $local_repo");
629       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
630       if ($?) {
631         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
632       }
633     }
634
635     # Now that the data is all here, we will use our local repo for
636     # the rest of our git activities.
637     $repo = $local_repo;
638   }
639
640   my $gitcmd = "git --git-dir=\Q$repo\E";
641   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
642   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
643     croak("`$gitcmd rev-list` exited "
644           .exit_status_s($?)
645           .", '$treeish' not found, giving up");
646   }
647   $commit = $1;
648   Log(undef, "Version $treeish is commit $commit");
649
650   if ($commit ne $Job->{'script_version'}) {
651     # Record the real commit id in the database, frozentokey, logs,
652     # etc. -- instead of an abbreviation or a branch name which can
653     # become ambiguous or point to a different commit in the future.
654     if (!$Job->update_attributes('script_version' => $commit)) {
655       croak("Error: failed to update job's script_version attribute");
656     }
657   }
658
659   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
660   add_git_archive("$gitcmd archive ''\Q$commit\E");
661 }
662
663 my $git_archive = combined_git_archive();
664 if (!defined $git_archive) {
665   Log(undef, "Skip install phase (no git archive)");
666   if ($have_slurm) {
667     Log(undef, "Warning: This probably means workers have no source tree!");
668   }
669 }
670 else {
671   my $exited;
672   my $install_script_tries_left = 3;
673   for (my $attempts = 0; $attempts < 3; $attempts++) {
674     my @srunargs = ("srun",
675                     "--nodelist=$nodelist",
676                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
677     my @execargs = ("sh", "-c",
678                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
679
680     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
681     my ($stdout, $stderr);
682     ($exited, $stdout, $stderr) = srun_sync(
683       \@srunargs, \@execargs,
684       {label => "run install script on all workers"},
685       $build_script . $git_archive);
686
687     my $stderr_anything_from_script = 0;
688     for my $line (split(/\n/, $stderr)) {
689       if ($line !~ /^(srun: error: |starting: \[)/) {
690         $stderr_anything_from_script = 1;
691       }
692     }
693
694     last if $exited == 0 || $main::please_freeze;
695
696     # If the install script fails but doesn't print an error message,
697     # the next thing anyone is likely to do is just run it again in
698     # case it was a transient problem like "slurm communication fails
699     # because the network isn't reliable enough". So we'll just do
700     # that ourselves (up to 3 attempts in total). OTOH, if there is an
701     # error message, the problem is more likely to have a real fix and
702     # we should fail the job so the fixing process can start, instead
703     # of doing 2 more attempts.
704     last if $stderr_anything_from_script;
705   }
706
707   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
708     unlink($tar_filename);
709   }
710
711   if ($exited != 0) {
712     croak("Giving up");
713   }
714 }
715
716 foreach (qw (script script_version script_parameters runtime_constraints))
717 {
718   Log (undef,
719        "$_ " .
720        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
721 }
722 foreach (split (/\n/, $Job->{knobs}))
723 {
724   Log (undef, "knob " . $_);
725 }
726 my $resp = api_call(
727   'nodes/list',
728   'filters' => [['hostname', 'in', \@node]],
729   'order' => 'hostname',
730   'limit' => scalar(@node),
731     );
732 for my $n (@{$resp->{items}}) {
733   Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
734 }
735
736
737
738 $main::success = undef;
739
740
741
742 ONELEVEL:
743
744 my $thisround_succeeded = 0;
745 my $thisround_failed = 0;
746 my $thisround_failed_multiple = 0;
747 my $working_slot_count = scalar(@slot);
748
749 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
750                        or $a <=> $b } @jobstep_todo;
751 my $level = $jobstep[$jobstep_todo[0]]->{level};
752
753 my $initial_tasks_this_level = 0;
754 foreach my $id (@jobstep_todo) {
755   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
756 }
757
758 # If the number of tasks scheduled at this level #T is smaller than the number
759 # of slots available #S, only use the first #T slots, or the first slot on
760 # each node, whichever number is greater.
761 #
762 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
763 # based on these numbers.  Using fewer slots makes more resources available
764 # to each individual task, which should normally be a better strategy when
765 # there are fewer of them running with less parallelism.
766 #
767 # Note that this calculation is not redone if the initial tasks at
768 # this level queue more tasks at the same level.  This may harm
769 # overall task throughput for that level.
770 my @freeslot;
771 if ($initial_tasks_this_level < @node) {
772   @freeslot = (0..$#node);
773 } elsif ($initial_tasks_this_level < @slot) {
774   @freeslot = (0..$initial_tasks_this_level - 1);
775 } else {
776   @freeslot = (0..$#slot);
777 }
778 my $round_num_freeslots = scalar(@freeslot);
779 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
780
781 my %round_max_slots = ();
782 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
783   my $this_slot = $slot[$freeslot[$ii]];
784   my $node_name = $this_slot->{node}->{name};
785   $round_max_slots{$node_name} ||= $this_slot->{cpu};
786   last if (scalar(keys(%round_max_slots)) >= @node);
787 }
788
789 Log(undef, "start level $level with $round_num_freeslots slots");
790 my @holdslot;
791 my %reader;
792 my $progress_is_dirty = 1;
793 my $progress_stats_updated = 0;
794
795 update_progress_stats();
796
797
798 THISROUND:
799 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
800 {
801   # Don't create new tasks if we already know the job's final result.
802   last if defined($main::success);
803
804   my $id = $jobstep_todo[$todo_ptr];
805   my $Jobstep = $jobstep[$id];
806   if ($Jobstep->{level} != $level)
807   {
808     next;
809   }
810
811   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
812   set_nonblocking($reader{$id});
813
814   my $childslot = $freeslot[0];
815   my $childnode = $slot[$childslot]->{node};
816   my $childslotname = join (".",
817                             $slot[$childslot]->{node}->{name},
818                             $slot[$childslot]->{cpu});
819
820   my $childpid = fork();
821   if ($childpid == 0)
822   {
823     $SIG{'INT'} = 'DEFAULT';
824     $SIG{'QUIT'} = 'DEFAULT';
825     $SIG{'TERM'} = 'DEFAULT';
826
827     foreach (values (%reader))
828     {
829       close($_);
830     }
831     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
832     open(STDOUT,">&writer");
833     open(STDERR,">&writer");
834
835     undef $dbh;
836     undef $sth;
837
838     delete $ENV{"GNUPGHOME"};
839     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
840     $ENV{"TASK_QSEQUENCE"} = $id;
841     $ENV{"TASK_SEQUENCE"} = $level;
842     $ENV{"JOB_SCRIPT"} = $Job->{script};
843     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
844       $param =~ tr/a-z/A-Z/;
845       $ENV{"JOB_PARAMETER_$param"} = $value;
846     }
847     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
848     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
849     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
850     $ENV{"HOME"} = $ENV{"TASK_WORK"};
851     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
852     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
853     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
854
855     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
856
857     $ENV{"GZIP"} = "-n";
858
859     my @srunargs = (
860       "srun",
861       "--nodelist=".$childnode->{name},
862       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
863       "--job-name=$job_id.$id.$$",
864         );
865
866     my $stdbuf = " stdbuf --output=0 --error=0 ";
867
868     my $arv_file_cache = "";
869     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
870       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
871     }
872
873     my $command =
874         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
875         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
876         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
877         # These environment variables get used explicitly later in
878         # $command.  No tool is expected to read these values directly.
879         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
880         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
881         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
882         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
883         .q{&& declare -a VOLUMES=() }
884         .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
885         .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
886         .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
887
888     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
889     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
890     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
891
892     if ($docker_hash)
893     {
894       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
895       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
896       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
897       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
898       # We only set memory limits if Docker lets us limit both memory and swap.
899       # Memory limits alone have been supported longer, but subprocesses tend
900       # to get SIGKILL if they exceed that without any swap limit set.
901       # See #5642 for additional background.
902       if ($docker_limitmem) {
903         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
904       }
905
906       # The source tree and $destdir directory (which we have
907       # installed on the worker host) are available in the container,
908       # under the same path.
909       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
910       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
911
912       # Currently, we make the "by_pdh" directory in arv-mount's mount
913       # point appear at /keep inside the container (instead of using
914       # the same path as the host like we do with CRUNCH_SRC and
915       # CRUNCH_INSTALL). However, crunch scripts and utilities must
916       # not rely on this. They must use $TASK_KEEPMOUNT.
917       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
918       $ENV{TASK_KEEPMOUNT} = "/keep";
919
920       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
921       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
922       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
923
924       # TASK_WORK is almost exactly like a docker data volume: it
925       # starts out empty, is writable, and persists until no
926       # containers use it any more. We don't use --volumes-from to
927       # share it with other containers: it is only accessible to this
928       # task, and it goes away when this task stops.
929       #
930       # However, a docker data volume is writable only by root unless
931       # the mount point already happens to exist in the container with
932       # different permissions. Therefore, we [1] assume /tmp already
933       # exists in the image and is writable by the crunch user; [2]
934       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
935       # writable if they are created by docker while setting up the
936       # other --volumes); and [3] create $TASK_WORK inside the
937       # container using $build_script.
938       $command .= "--volume=/tmp ";
939       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
940       $ENV{"HOME"} = $ENV{"TASK_WORK"};
941       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
942
943       # TODO: Share a single JOB_WORK volume across all task
944       # containers on a given worker node, and delete it when the job
945       # ends (and, in case that doesn't work, when the next job
946       # starts).
947       #
948       # For now, use the same approach as TASK_WORK above.
949       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
950
951       # Bind mount the crunchrunner binary and host TLS certificates file into
952       # the container.
953       $command .= '"${VOLUMES[@]}" ';
954
955       while (my ($env_key, $env_val) = each %ENV)
956       {
957         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
958           $command .= "--env=\Q$env_key=$env_val\E ";
959         }
960       }
961       $command .= "--env=\QHOME=$ENV{HOME}\E ";
962       $command .= "\Q$docker_hash\E ";
963
964       if ($Job->{arvados_sdk_version}) {
965         $command .= $stdbuf;
966         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
967       } else {
968         $command .= "/bin/sh -c \'python -c " .
969             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
970             ">&2 2>/dev/null; " .
971             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
972             "if which stdbuf >/dev/null ; then " .
973             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
974             " else " .
975             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
976             " fi\'";
977       }
978     } else {
979       # Non-docker run
980       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
981       $command .= $stdbuf;
982       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
983     }
984
985     my @execargs = ('bash', '-c', $command);
986     srun (\@srunargs, \@execargs, undef, $build_script);
987     # exec() failed, we assume nothing happened.
988     die "srun() failed on build script\n";
989   }
990   close("writer");
991   if (!defined $childpid)
992   {
993     close $reader{$id};
994     delete $reader{$id};
995     next;
996   }
997   shift @freeslot;
998   $proc{$childpid} = {
999     jobstepidx => $id,
1000     time => time,
1001     slot => $childslot,
1002     jobstepname => "$job_id.$id.$childpid",
1003   };
1004   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1005   $slot[$childslot]->{pid} = $childpid;
1006
1007   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1008   Log ($id, "child $childpid started on $childslotname");
1009   $Jobstep->{starttime} = time;
1010   $Jobstep->{node} = $childnode->{name};
1011   $Jobstep->{slotindex} = $childslot;
1012   delete $Jobstep->{stderr};
1013   delete $Jobstep->{finishtime};
1014   delete $Jobstep->{tempfail};
1015
1016   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1017   $Jobstep->{'arvados_task'}->save;
1018
1019   splice @jobstep_todo, $todo_ptr, 1;
1020   --$todo_ptr;
1021
1022   $progress_is_dirty = 1;
1023
1024   while (!@freeslot
1025          ||
1026          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1027   {
1028     last THISROUND if $main::please_freeze;
1029     if ($main::please_info)
1030     {
1031       $main::please_info = 0;
1032       freeze();
1033       create_output_collection();
1034       save_meta(1);
1035       update_progress_stats();
1036     }
1037     my $gotsome
1038         = readfrompipes ()
1039         + reapchildren ();
1040     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1041     {
1042       check_refresh_wanted();
1043       check_squeue();
1044       update_progress_stats();
1045     }
1046     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1047     {
1048       update_progress_stats();
1049     }
1050     if (!$gotsome) {
1051       select (undef, undef, undef, 0.1);
1052     }
1053     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1054                                         $_->{node}->{hold_count} < 4 } @slot);
1055     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1056         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1057     {
1058       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1059           .($thisround_failed+$thisround_succeeded)
1060           .") -- giving up on this round";
1061       Log (undef, $message);
1062       last THISROUND;
1063     }
1064
1065     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1066     for (my $i=$#freeslot; $i>=0; $i--) {
1067       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1068         push @holdslot, (splice @freeslot, $i, 1);
1069       }
1070     }
1071     for (my $i=$#holdslot; $i>=0; $i--) {
1072       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1073         push @freeslot, (splice @holdslot, $i, 1);
1074       }
1075     }
1076
1077     # give up if no nodes are succeeding
1078     if ($working_slot_count < 1) {
1079       Log(undef, "Every node has failed -- giving up");
1080       last THISROUND;
1081     }
1082   }
1083 }
1084
1085
1086 push @freeslot, splice @holdslot;
1087 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1088
1089
1090 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1091 while (%proc)
1092 {
1093   if ($main::please_continue) {
1094     $main::please_continue = 0;
1095     goto THISROUND;
1096   }
1097   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1098   readfrompipes ();
1099   if (!reapchildren())
1100   {
1101     check_refresh_wanted();
1102     check_squeue();
1103     update_progress_stats();
1104     select (undef, undef, undef, 0.1);
1105     killem (keys %proc) if $main::please_freeze;
1106   }
1107 }
1108
1109 update_progress_stats();
1110 freeze_if_want_freeze();
1111
1112
1113 if (!defined $main::success)
1114 {
1115   if (!@jobstep_todo) {
1116     $main::success = 1;
1117   } elsif ($working_slot_count < 1) {
1118     save_output_collection();
1119     save_meta();
1120     exit(EX_RETRY_UNLOCKED);
1121   } elsif ($thisround_succeeded == 0 &&
1122            ($thisround_failed == 0 || $thisround_failed > 4)) {
1123     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1124     Log (undef, $message);
1125     $main::success = 0;
1126   }
1127 }
1128
1129 goto ONELEVEL if !defined $main::success;
1130
1131
1132 release_allocation();
1133 freeze();
1134 my $collated_output = save_output_collection();
1135 Log (undef, "finish");
1136
1137 my $final_log = save_meta();
1138
1139 my $final_state;
1140 if ($collated_output && $final_log && $main::success) {
1141   $final_state = 'Complete';
1142 } else {
1143   $final_state = 'Failed';
1144 }
1145 $Job->update_attributes('state' => $final_state);
1146
1147 exit (($final_state eq 'Complete') ? 0 : 1);
1148
1149
1150
1151 sub update_progress_stats
1152 {
1153   $progress_stats_updated = time;
1154   return if !$progress_is_dirty;
1155   my ($todo, $done, $running) = (scalar @jobstep_todo,
1156                                  scalar @jobstep_done,
1157                                  scalar keys(%proc));
1158   $Job->{'tasks_summary'} ||= {};
1159   $Job->{'tasks_summary'}->{'todo'} = $todo;
1160   $Job->{'tasks_summary'}->{'done'} = $done;
1161   $Job->{'tasks_summary'}->{'running'} = $running;
1162   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1163   Log (undef, "status: $done done, $running running, $todo todo");
1164   $progress_is_dirty = 0;
1165 }
1166
1167
1168
1169 sub reapchildren
1170 {
1171   my $children_reaped = 0;
1172   my @successful_task_uuids = ();
1173
1174   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1175   {
1176     my $childstatus = $?;
1177
1178     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1179                     . "."
1180                     . $slot[$proc{$pid}->{slot}]->{cpu});
1181     my $jobstepidx = $proc{$pid}->{jobstepidx};
1182
1183     $children_reaped++;
1184     my $elapsed = time - $proc{$pid}->{time};
1185     my $Jobstep = $jobstep[$jobstepidx];
1186
1187     my $exitvalue = $childstatus >> 8;
1188     my $exitinfo = "exit ".exit_status_s($childstatus);
1189     $Jobstep->{'arvados_task'}->reload;
1190     my $task_success = $Jobstep->{'arvados_task'}->{success};
1191
1192     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1193
1194     if (!defined $task_success) {
1195       # task did not indicate one way or the other --> fail
1196       Log($jobstepidx, sprintf(
1197             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1198             exit_status_s($childstatus)));
1199       $Jobstep->{'arvados_task'}->{success} = 0;
1200       $Jobstep->{'arvados_task'}->save;
1201       $task_success = 0;
1202     }
1203
1204     if (!$task_success)
1205     {
1206       my $temporary_fail;
1207       $temporary_fail ||= $Jobstep->{tempfail};
1208       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1209
1210       ++$thisround_failed;
1211       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1212
1213       # Check for signs of a failed or misconfigured node
1214       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1215           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1216         # Don't count this against jobstep failure thresholds if this
1217         # node is already suspected faulty and srun exited quickly
1218         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1219             $elapsed < 5) {
1220           Log ($jobstepidx, "blaming failure on suspect node " .
1221                $slot[$proc{$pid}->{slot}]->{node}->{name});
1222           $temporary_fail ||= 1;
1223         }
1224         ban_node_by_slot($proc{$pid}->{slot});
1225       }
1226
1227       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1228                                 ++$Jobstep->{'failures'},
1229                                 $temporary_fail ? 'temporary' : 'permanent',
1230                                 $elapsed));
1231
1232       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1233         # Give up on this task, and the whole job
1234         $main::success = 0;
1235       }
1236       # Put this task back on the todo queue
1237       push @jobstep_todo, $jobstepidx;
1238       $Job->{'tasks_summary'}->{'failed'}++;
1239     }
1240     else # task_success
1241     {
1242       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1243       ++$thisround_succeeded;
1244       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1245       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1246       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1247       push @jobstep_done, $jobstepidx;
1248       Log ($jobstepidx, "success in $elapsed seconds");
1249     }
1250     $Jobstep->{exitcode} = $childstatus;
1251     $Jobstep->{finishtime} = time;
1252     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1253     $Jobstep->{'arvados_task'}->save;
1254     process_stderr_final ($jobstepidx);
1255     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1256                               length($Jobstep->{'arvados_task'}->{output}),
1257                               $Jobstep->{'arvados_task'}->{output}));
1258
1259     close $reader{$jobstepidx};
1260     delete $reader{$jobstepidx};
1261     delete $slot[$proc{$pid}->{slot}]->{pid};
1262     push @freeslot, $proc{$pid}->{slot};
1263     delete $proc{$pid};
1264
1265     $progress_is_dirty = 1;
1266   }
1267
1268   if (scalar(@successful_task_uuids) > 0)
1269   {
1270     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1271     # Load new tasks
1272     my $newtask_list = [];
1273     my $newtask_results;
1274     do {
1275       $newtask_results = api_call(
1276         "job_tasks/list",
1277         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1278         'order' => 'qsequence',
1279         'offset' => scalar(@$newtask_list),
1280           );
1281       push(@$newtask_list, @{$newtask_results->{items}});
1282     } while (@{$newtask_results->{items}});
1283     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1284     foreach my $arvados_task (@$newtask_list) {
1285       my $jobstep = {
1286         'level' => $arvados_task->{'sequence'},
1287         'failures' => 0,
1288         'arvados_task' => $arvados_task
1289       };
1290       push @jobstep, $jobstep;
1291       push @jobstep_todo, $#jobstep;
1292     }
1293   }
1294
1295   return $children_reaped;
1296 }
1297
1298 sub check_refresh_wanted
1299 {
1300   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1301   if (@stat &&
1302       $stat[9] > $latest_refresh &&
1303       # ...and we have actually locked the job record...
1304       $job_id eq $Job->{'uuid'}) {
1305     $latest_refresh = scalar time;
1306     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1307     for my $attr ('cancelled_at',
1308                   'cancelled_by_user_uuid',
1309                   'cancelled_by_client_uuid',
1310                   'state') {
1311       $Job->{$attr} = $Job2->{$attr};
1312     }
1313     if ($Job->{'state'} ne "Running") {
1314       if ($Job->{'state'} eq "Cancelled") {
1315         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1316       } else {
1317         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1318       }
1319       $main::success = 0;
1320       $main::please_freeze = 1;
1321     }
1322   }
1323 }
1324
1325 sub check_squeue
1326 {
1327   my $last_squeue_check = $squeue_checked;
1328
1329   # Do not call `squeue` or check the kill list more than once every
1330   # 15 seconds.
1331   return if $last_squeue_check > time - 15;
1332   $squeue_checked = time;
1333
1334   # Look for children from which we haven't received stderr data since
1335   # the last squeue check. If no such children exist, all procs are
1336   # alive and there's no need to even look at squeue.
1337   #
1338   # As long as the crunchstat poll interval (10s) is shorter than the
1339   # squeue check interval (15s) this should make the squeue check an
1340   # infrequent event.
1341   my $silent_procs = 0;
1342   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1343   {
1344     if (!exists($js->{stderr_at}))
1345     {
1346       $js->{stderr_at} = 0;
1347     }
1348     if ($js->{stderr_at} < $last_squeue_check)
1349     {
1350       $silent_procs++;
1351     }
1352   }
1353   return if $silent_procs == 0;
1354
1355   # use killem() on procs whose killtime is reached
1356   while (my ($pid, $procinfo) = each %proc)
1357   {
1358     my $js = $jobstep[$procinfo->{jobstepidx}];
1359     if (exists $procinfo->{killtime}
1360         && $procinfo->{killtime} <= time
1361         && $js->{stderr_at} < $last_squeue_check)
1362     {
1363       my $sincewhen = "";
1364       if ($js->{stderr_at}) {
1365         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1366       }
1367       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1368       killem ($pid);
1369     }
1370   }
1371
1372   if (!$have_slurm)
1373   {
1374     # here is an opportunity to check for mysterious problems with local procs
1375     return;
1376   }
1377
1378   # Get a list of steps still running.  Note: squeue(1) says --steps
1379   # selects a format (which we override anyway) and allows us to
1380   # specify which steps we're interested in (which we don't).
1381   # Importantly, it also changes the meaning of %j from "job name" to
1382   # "step name" and (although this isn't mentioned explicitly in the
1383   # docs) switches from "one line per job" mode to "one line per step"
1384   # mode. Without it, we'd just get a list of one job, instead of a
1385   # list of N steps.
1386   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1387   if ($? != 0)
1388   {
1389     Log(undef, "warning: squeue exit status $? ($!)");
1390     return;
1391   }
1392   chop @squeue;
1393
1394   # which of my jobsteps are running, according to squeue?
1395   my %ok;
1396   for my $jobstepname (@squeue)
1397   {
1398     $ok{$jobstepname} = 1;
1399   }
1400
1401   # Check for child procs >60s old and not mentioned by squeue.
1402   while (my ($pid, $procinfo) = each %proc)
1403   {
1404     if ($procinfo->{time} < time - 60
1405         && $procinfo->{jobstepname}
1406         && !exists $ok{$procinfo->{jobstepname}}
1407         && !exists $procinfo->{killtime})
1408     {
1409       # According to slurm, this task has ended (successfully or not)
1410       # -- but our srun child hasn't exited. First we must wait (30
1411       # seconds) in case this is just a race between communication
1412       # channels. Then, if our srun child process still hasn't
1413       # terminated, we'll conclude some slurm communication
1414       # error/delay has caused the task to die without notifying srun,
1415       # and we'll kill srun ourselves.
1416       $procinfo->{killtime} = time + 30;
1417       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1418     }
1419   }
1420 }
1421
1422 sub check_sinfo
1423 {
1424   # If a node fails in a multi-node "srun" call during job setup, the call
1425   # may hang instead of exiting with a nonzero code.  This function checks
1426   # "sinfo" for the health of the nodes that were allocated and ensures that
1427   # they are all still in the "alloc" state.  If a node that is allocated to
1428   # this job is not in "alloc" state, then set please_freeze.
1429   #
1430   # This is only called from srun_sync() for node configuration.  If a
1431   # node fails doing actual work, there are other recovery mechanisms.
1432
1433   # Do not call `sinfo` more than once every 15 seconds.
1434   return if $sinfo_checked > time - 15;
1435   $sinfo_checked = time;
1436
1437   # The output format "%t" means output node states.
1438   my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1439   if ($? != 0)
1440   {
1441     Log(undef, "warning: sinfo exit status $? ($!)");
1442     return;
1443   }
1444   chop @sinfo;
1445
1446   foreach (@sinfo)
1447   {
1448     if ($_ != "alloc" && $_ != "alloc*") {
1449       $main::please_freeze = 1;
1450     }
1451   }
1452 }
1453
1454 sub release_allocation
1455 {
1456   if ($have_slurm)
1457   {
1458     Log (undef, "release job allocation");
1459     system "scancel $ENV{SLURM_JOB_ID}";
1460   }
1461 }
1462
1463
1464 sub readfrompipes
1465 {
1466   my $gotsome = 0;
1467   my %fd_job;
1468   my $sel = IO::Select->new();
1469   foreach my $jobstepidx (keys %reader)
1470   {
1471     my $fd = $reader{$jobstepidx};
1472     $sel->add($fd);
1473     $fd_job{$fd} = $jobstepidx;
1474
1475     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1476       $sel->add($stdout_fd);
1477       $fd_job{$stdout_fd} = $jobstepidx;
1478     }
1479   }
1480   # select on all reader fds with 0.1s timeout
1481   my @ready_fds = $sel->can_read(0.1);
1482   foreach my $fd (@ready_fds)
1483   {
1484     my $buf;
1485     if (0 < sysread ($fd, $buf, 65536))
1486     {
1487       $gotsome = 1;
1488       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1489
1490       my $jobstepidx = $fd_job{$fd};
1491       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1492         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1493         next;
1494       }
1495
1496       $jobstep[$jobstepidx]->{stderr_at} = time;
1497       $jobstep[$jobstepidx]->{stderr} .= $buf;
1498
1499       # Consume everything up to the last \n
1500       preprocess_stderr ($jobstepidx);
1501
1502       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1503       {
1504         # If we get a lot of stderr without a newline, chop off the
1505         # front to avoid letting our buffer grow indefinitely.
1506         substr ($jobstep[$jobstepidx]->{stderr},
1507                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1508       }
1509     }
1510   }
1511   return $gotsome;
1512 }
1513
1514
1515 # Consume all full lines of stderr for a jobstep. Everything after the
1516 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1517 # returning.
1518 sub preprocess_stderr
1519 {
1520   my $jobstepidx = shift;
1521   # slotindex is only defined for children running Arvados job tasks.
1522   # Be prepared to handle the undef case (for setup srun calls, etc.).
1523   my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1524
1525   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1526     my $line = $1;
1527     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1528     Log ($jobstepidx, "stderr $line");
1529     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1530       # If the allocation is revoked, we can't possibly continue, so mark all
1531       # nodes as failed.  This will cause the overall exit code to be
1532       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1533       # this job.
1534       $main::please_freeze = 1;
1535       foreach my $st (@slot) {
1536         $st->{node}->{fail_count}++;
1537       }
1538     }
1539     elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
1540       $jobstep[$jobstepidx]->{tempfail} = 1;
1541       if (defined($job_slot_index)) {
1542         $slot[$job_slot_index]->{node}->{fail_count}++;
1543         ban_node_by_slot($job_slot_index);
1544       }
1545     }
1546     elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1547       $jobstep[$jobstepidx]->{tempfail} = 1;
1548       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1549     }
1550     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1551       $jobstep[$jobstepidx]->{tempfail} = 1;
1552     }
1553   }
1554 }
1555
1556
1557 sub process_stderr_final
1558 {
1559   my $jobstepidx = shift;
1560   preprocess_stderr ($jobstepidx);
1561
1562   map {
1563     Log ($jobstepidx, "stderr $_");
1564   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1565   $jobstep[$jobstepidx]->{stderr} = '';
1566 }
1567
1568 sub fetch_block
1569 {
1570   my $hash = shift;
1571   my $keep;
1572   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1573     Log(undef, "fetch_block run error from arv-get $hash: $!");
1574     return undef;
1575   }
1576   my $output_block = "";
1577   while (1) {
1578     my $buf;
1579     my $bytes = sysread($keep, $buf, 1024 * 1024);
1580     if (!defined $bytes) {
1581       Log(undef, "fetch_block read error from arv-get: $!");
1582       $output_block = undef;
1583       last;
1584     } elsif ($bytes == 0) {
1585       # sysread returns 0 at the end of the pipe.
1586       last;
1587     } else {
1588       # some bytes were read into buf.
1589       $output_block .= $buf;
1590     }
1591   }
1592   close $keep;
1593   if ($?) {
1594     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1595     $output_block = undef;
1596   }
1597   return $output_block;
1598 }
1599
1600 # Create a collection by concatenating the output of all tasks (each
1601 # task's output is either a manifest fragment, a locator for a
1602 # manifest fragment stored in Keep, or nothing at all). Return the
1603 # portable_data_hash of the new collection.
1604 sub create_output_collection
1605 {
1606   Log (undef, "collate");
1607
1608   my ($child_out, $child_in);
1609   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1610 import arvados
1611 import sys
1612 print (arvados.api("v1").collections().
1613        create(body={"manifest_text": sys.stdin.read(),
1614                     "owner_uuid": sys.argv[2]}).
1615        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1616 }, retry_count(), $Job->{owner_uuid});
1617
1618   my $task_idx = -1;
1619   my $manifest_size = 0;
1620   for (@jobstep)
1621   {
1622     ++$task_idx;
1623     my $output = $_->{'arvados_task'}->{output};
1624     next if (!defined($output));
1625     my $next_write;
1626     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1627       $next_write = fetch_block($output);
1628     } else {
1629       $next_write = $output;
1630     }
1631     if (defined($next_write)) {
1632       if (!defined(syswrite($child_in, $next_write))) {
1633         # There's been an error writing.  Stop the loop.
1634         # We'll log details about the exit code later.
1635         last;
1636       } else {
1637         $manifest_size += length($next_write);
1638       }
1639     } else {
1640       my $uuid = $_->{'arvados_task'}->{'uuid'};
1641       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1642       $main::success = 0;
1643     }
1644   }
1645   close($child_in);
1646   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1647
1648   my $joboutput;
1649   my $s = IO::Select->new($child_out);
1650   if ($s->can_read(120)) {
1651     sysread($child_out, $joboutput, 1024 * 1024);
1652     waitpid($pid, 0);
1653     if ($?) {
1654       Log(undef, "output collection creation exited " . exit_status_s($?));
1655       $joboutput = undef;
1656     } else {
1657       chomp($joboutput);
1658     }
1659   } else {
1660     Log (undef, "timed out while creating output collection");
1661     foreach my $signal (2, 2, 2, 15, 15, 9) {
1662       kill($signal, $pid);
1663       last if waitpid($pid, WNOHANG) == -1;
1664       sleep(1);
1665     }
1666   }
1667   close($child_out);
1668
1669   return $joboutput;
1670 }
1671
1672 # Calls create_output_collection, logs the result, and returns it.
1673 # If that was successful, save that as the output in the job record.
1674 sub save_output_collection {
1675   my $collated_output = create_output_collection();
1676
1677   if (!$collated_output) {
1678     Log(undef, "Failed to write output collection");
1679   }
1680   else {
1681     Log(undef, "job output $collated_output");
1682     $Job->update_attributes('output' => $collated_output);
1683   }
1684   return $collated_output;
1685 }
1686
1687 sub killem
1688 {
1689   foreach (@_)
1690   {
1691     my $sig = 2;                # SIGINT first
1692     if (exists $proc{$_}->{"sent_$sig"} &&
1693         time - $proc{$_}->{"sent_$sig"} > 4)
1694     {
1695       $sig = 15;                # SIGTERM if SIGINT doesn't work
1696     }
1697     if (exists $proc{$_}->{"sent_$sig"} &&
1698         time - $proc{$_}->{"sent_$sig"} > 4)
1699     {
1700       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1701     }
1702     if (!exists $proc{$_}->{"sent_$sig"})
1703     {
1704       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1705       kill $sig, $_;
1706       select (undef, undef, undef, 0.1);
1707       if ($sig == 2)
1708       {
1709         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1710       }
1711       $proc{$_}->{"sent_$sig"} = time;
1712       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1713     }
1714   }
1715 }
1716
1717
1718 sub fhbits
1719 {
1720   my($bits);
1721   for (@_) {
1722     vec($bits,fileno($_),1) = 1;
1723   }
1724   $bits;
1725 }
1726
1727
1728 # Send log output to Keep via arv-put.
1729 #
1730 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1731 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1732 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1733 # $log_pipe_pid is the pid of the arv-put subprocess.
1734 #
1735 # The only functions that should access these variables directly are:
1736 #
1737 # log_writer_start($logfilename)
1738 #     Starts an arv-put pipe, reading data on stdin and writing it to
1739 #     a $logfilename file in an output collection.
1740 #
1741 # log_writer_read_output([$timeout])
1742 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1743 #     Passes $timeout to the select() call, with a default of 0.01.
1744 #     Returns the result of the last read() call on $log_pipe_out, or
1745 #     -1 if read() wasn't called because select() timed out.
1746 #     Only other log_writer_* functions should need to call this.
1747 #
1748 # log_writer_send($txt)
1749 #     Writes $txt to the output log collection.
1750 #
1751 # log_writer_finish()
1752 #     Closes the arv-put pipe and returns the output that it produces.
1753 #
1754 # log_writer_is_active()
1755 #     Returns a true value if there is currently a live arv-put
1756 #     process, false otherwise.
1757 #
1758 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1759     $log_pipe_pid);
1760
1761 sub log_writer_start($)
1762 {
1763   my $logfilename = shift;
1764   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1765                         'arv-put',
1766                         '--stream',
1767                         '--retries', '6',
1768                         '--filename', $logfilename,
1769                         '-');
1770   $log_pipe_out_buf = "";
1771   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1772 }
1773
1774 sub log_writer_read_output {
1775   my $timeout = shift || 0.01;
1776   my $read = -1;
1777   while ($read && $log_pipe_out_select->can_read($timeout)) {
1778     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1779                  length($log_pipe_out_buf));
1780   }
1781   if (!defined($read)) {
1782     Log(undef, "error reading log manifest from arv-put: $!");
1783   }
1784   return $read;
1785 }
1786
1787 sub log_writer_send($)
1788 {
1789   my $txt = shift;
1790   print $log_pipe_in $txt;
1791   log_writer_read_output();
1792 }
1793
1794 sub log_writer_finish()
1795 {
1796   return unless $log_pipe_pid;
1797
1798   close($log_pipe_in);
1799
1800   my $logger_failed = 0;
1801   my $read_result = log_writer_read_output(600);
1802   if ($read_result == -1) {
1803     $logger_failed = -1;
1804     Log (undef, "timed out reading from 'arv-put'");
1805   } elsif ($read_result != 0) {
1806     $logger_failed = -2;
1807     Log(undef, "failed to read arv-put log manifest to EOF");
1808   }
1809
1810   waitpid($log_pipe_pid, 0);
1811   if ($?) {
1812     $logger_failed ||= $?;
1813     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1814   }
1815
1816   close($log_pipe_out);
1817   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1818   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1819       $log_pipe_out_select = undef;
1820
1821   return $arv_put_output;
1822 }
1823
1824 sub log_writer_is_active() {
1825   return $log_pipe_pid;
1826 }
1827
1828 sub Log                         # ($jobstepidx, $logmessage)
1829 {
1830   my ($jobstepidx, $logmessage) = @_;
1831   if ($logmessage =~ /\n/) {
1832     for my $line (split (/\n/, $_[1])) {
1833       Log ($jobstepidx, $line);
1834     }
1835     return;
1836   }
1837   my $fh = select STDERR; $|=1; select $fh;
1838   my $task_qseq = '';
1839   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1840     $task_qseq = $jobstepidx;
1841   }
1842   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1843   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1844   $message .= "\n";
1845   my $datetime;
1846   if (log_writer_is_active() || -t STDERR) {
1847     my @gmtime = gmtime;
1848     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1849                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1850   }
1851   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1852
1853   if (log_writer_is_active()) {
1854     log_writer_send($datetime . " " . $message);
1855   }
1856 }
1857
1858
1859 sub croak
1860 {
1861   my ($package, $file, $line) = caller;
1862   my $message = "@_ at $file line $line\n";
1863   Log (undef, $message);
1864   release_allocation();
1865   freeze() if @jobstep_todo;
1866   create_output_collection() if @jobstep_todo;
1867   cleanup();
1868   save_meta();
1869   die;
1870 }
1871
1872
1873 sub cleanup
1874 {
1875   return unless $Job;
1876   if ($Job->{'state'} eq 'Cancelled') {
1877     $Job->update_attributes('finished_at' => scalar gmtime);
1878   } else {
1879     $Job->update_attributes('state' => 'Failed');
1880   }
1881 }
1882
1883
1884 sub save_meta
1885 {
1886   my $justcheckpoint = shift; # false if this will be the last meta saved
1887   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1888   return unless log_writer_is_active();
1889   my $log_manifest = log_writer_finish();
1890   return unless defined($log_manifest);
1891
1892   if ($Job->{log}) {
1893     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1894     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1895   }
1896
1897   my $log_coll = api_call(
1898     "collections/create", ensure_unique_name => 1, collection => {
1899       manifest_text => $log_manifest,
1900       owner_uuid => $Job->{owner_uuid},
1901       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1902     });
1903   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1904   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1905
1906   return $log_coll->{portable_data_hash};
1907 }
1908
1909
1910 sub freeze_if_want_freeze
1911 {
1912   if ($main::please_freeze)
1913   {
1914     release_allocation();
1915     if (@_)
1916     {
1917       # kill some srun procs before freeze+stop
1918       map { $proc{$_} = {} } @_;
1919       while (%proc)
1920       {
1921         killem (keys %proc);
1922         select (undef, undef, undef, 0.1);
1923         my $died;
1924         while (($died = waitpid (-1, WNOHANG)) > 0)
1925         {
1926           delete $proc{$died};
1927         }
1928       }
1929     }
1930     freeze();
1931     create_output_collection();
1932     cleanup();
1933     save_meta();
1934     exit 1;
1935   }
1936 }
1937
1938
1939 sub freeze
1940 {
1941   Log (undef, "Freeze not implemented");
1942   return;
1943 }
1944
1945
1946 sub thaw
1947 {
1948   croak ("Thaw not implemented");
1949 }
1950
1951
1952 sub freezequote
1953 {
1954   my $s = shift;
1955   $s =~ s/\\/\\\\/g;
1956   $s =~ s/\n/\\n/g;
1957   return $s;
1958 }
1959
1960
1961 sub freezeunquote
1962 {
1963   my $s = shift;
1964   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1965   return $s;
1966 }
1967
1968 sub srun_sync
1969 {
1970   my $srunargs = shift;
1971   my $execargs = shift;
1972   my $opts = shift || {};
1973   my $stdin = shift;
1974
1975   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1976   Log (undef, "$label: start");
1977
1978   my ($stderr_r, $stderr_w);
1979   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1980
1981   my ($stdout_r, $stdout_w);
1982   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1983
1984   my $srunpid = fork();
1985   if ($srunpid == 0)
1986   {
1987     close($stderr_r);
1988     close($stdout_r);
1989     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1990     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1991     open(STDERR, ">&", $stderr_w);
1992     open(STDOUT, ">&", $stdout_w);
1993     srun ($srunargs, $execargs, $opts, $stdin);
1994     exit (1);
1995   }
1996   close($stderr_w);
1997   close($stdout_w);
1998
1999   set_nonblocking($stderr_r);
2000   set_nonblocking($stdout_r);
2001
2002   # Add entries to @jobstep and %proc so check_squeue() and
2003   # freeze_if_want_freeze() can treat it like a job task process.
2004   push @jobstep, {
2005     stderr => '',
2006     stderr_at => 0,
2007     stderr_captured => '',
2008     stdout_r => $stdout_r,
2009     stdout_captured => '',
2010   };
2011   my $jobstepidx = $#jobstep;
2012   $proc{$srunpid} = {
2013     jobstepidx => $jobstepidx,
2014   };
2015   $reader{$jobstepidx} = $stderr_r;
2016
2017   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2018     my $busy = readfrompipes();
2019     if (!$busy || ($latest_refresh + 2 < scalar time)) {
2020       check_refresh_wanted();
2021       check_squeue();
2022       check_sinfo();
2023     }
2024     if (!$busy) {
2025       select(undef, undef, undef, 0.1);
2026     }
2027     killem(keys %proc) if $main::please_freeze;
2028   }
2029   my $exited = $?;
2030
2031   1 while readfrompipes();
2032   process_stderr_final ($jobstepidx);
2033
2034   Log (undef, "$label: exit ".exit_status_s($exited));
2035
2036   close($stdout_r);
2037   close($stderr_r);
2038   delete $proc{$srunpid};
2039   delete $reader{$jobstepidx};
2040
2041   my $j = pop @jobstep;
2042   # If the srun showed signs of tempfail, ensure the caller treats that as a
2043   # failure case.
2044   if ($main::please_freeze || $j->{tempfail}) {
2045     $exited ||= 255;
2046   }
2047   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
2048 }
2049
2050
2051 sub srun
2052 {
2053   my $srunargs = shift;
2054   my $execargs = shift;
2055   my $opts = shift || {};
2056   my $stdin = shift;
2057   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2058
2059   $Data::Dumper::Terse = 1;
2060   $Data::Dumper::Indent = 0;
2061   my $show_cmd = Dumper($args);
2062   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2063   $show_cmd =~ s/\n/ /g;
2064   if ($opts->{fork}) {
2065     Log(undef, "starting: $show_cmd");
2066   } else {
2067     # This is a child process: parent is in charge of reading our
2068     # stderr and copying it to Log() if needed.
2069     warn "starting: $show_cmd\n";
2070   }
2071
2072   if (defined $stdin) {
2073     my $child = open STDIN, "-|";
2074     defined $child or die "no fork: $!";
2075     if ($child == 0) {
2076       print $stdin or die $!;
2077       close STDOUT or die $!;
2078       exit 0;
2079     }
2080   }
2081
2082   return system (@$args) if $opts->{fork};
2083
2084   exec @$args;
2085   warn "ENV size is ".length(join(" ",%ENV));
2086   die "exec failed: $!: @$args";
2087 }
2088
2089
2090 sub ban_node_by_slot {
2091   # Don't start any new jobsteps on this node for 60 seconds
2092   my $slotid = shift;
2093   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2094   $slot[$slotid]->{node}->{hold_count}++;
2095   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2096 }
2097
2098 sub must_lock_now
2099 {
2100   my ($lockfile, $error_message) = @_;
2101   open L, ">", $lockfile or croak("$lockfile: $!");
2102   if (!flock L, LOCK_EX|LOCK_NB) {
2103     croak("Can't lock $lockfile: $error_message\n");
2104   }
2105 }
2106
2107 sub find_docker_image {
2108   # Given a Keep locator, check to see if it contains a Docker image.
2109   # If so, return its stream name and Docker hash.
2110   # If not, return undef for both values.
2111   my $locator = shift;
2112   my ($streamname, $filename);
2113   my $image = api_call("collections/get", uuid => $locator);
2114   if ($image) {
2115     foreach my $line (split(/\n/, $image->{manifest_text})) {
2116       my @tokens = split(/\s+/, $line);
2117       next if (!@tokens);
2118       $streamname = shift(@tokens);
2119       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2120         if (defined($filename)) {
2121           return (undef, undef);  # More than one file in the Collection.
2122         } else {
2123           $filename = (split(/:/, $filedata, 3))[2];
2124         }
2125       }
2126     }
2127   }
2128   if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2129     return ($streamname, $1);
2130   } else {
2131     return (undef, undef);
2132   }
2133 }
2134
2135 sub retry_count {
2136   # Calculate the number of times an operation should be retried,
2137   # assuming exponential backoff, and that we're willing to retry as
2138   # long as tasks have been running.  Enforce a minimum of 3 retries.
2139   my ($starttime, $endtime, $timediff, $retries);
2140   if (@jobstep) {
2141     $starttime = $jobstep[0]->{starttime};
2142     $endtime = $jobstep[-1]->{finishtime};
2143   }
2144   if (!defined($starttime)) {
2145     $timediff = 0;
2146   } elsif (!defined($endtime)) {
2147     $timediff = time - $starttime;
2148   } else {
2149     $timediff = ($endtime - $starttime) - (time - $endtime);
2150   }
2151   if ($timediff > 0) {
2152     $retries = int(log($timediff) / log(2));
2153   } else {
2154     $retries = 1;  # Use the minimum.
2155   }
2156   return ($retries > 3) ? $retries : 3;
2157 }
2158
2159 sub retry_op {
2160   # Pass in two function references.
2161   # This method will be called with the remaining arguments.
2162   # If it dies, retry it with exponential backoff until it succeeds,
2163   # or until the current retry_count is exhausted.  After each failure
2164   # that can be retried, the second function will be called with
2165   # the current try count (0-based), next try time, and error message.
2166   my $operation = shift;
2167   my $retry_callback = shift;
2168   my $retries = retry_count();
2169   foreach my $try_count (0..$retries) {
2170     my $next_try = time + (2 ** $try_count);
2171     my $result = eval { $operation->(@_); };
2172     if (!$@) {
2173       return $result;
2174     } elsif ($try_count < $retries) {
2175       $retry_callback->($try_count, $next_try, $@);
2176       my $sleep_time = $next_try - time;
2177       sleep($sleep_time) if ($sleep_time > 0);
2178     }
2179   }
2180   # Ensure the error message ends in a newline, so Perl doesn't add
2181   # retry_op's line number to it.
2182   chomp($@);
2183   die($@ . "\n");
2184 }
2185
2186 sub api_call {
2187   # Pass in a /-separated API method name, and arguments for it.
2188   # This function will call that method, retrying as needed until
2189   # the current retry_count is exhausted, with a log on the first failure.
2190   my $method_name = shift;
2191   my $log_api_retry = sub {
2192     my ($try_count, $next_try_at, $errmsg) = @_;
2193     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2194     $errmsg =~ s/\s/ /g;
2195     $errmsg =~ s/\s+$//;
2196     my $retry_msg;
2197     if ($next_try_at < time) {
2198       $retry_msg = "Retrying.";
2199     } else {
2200       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2201       $retry_msg = "Retrying at $next_try_fmt.";
2202     }
2203     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2204   };
2205   my $method = $arv;
2206   foreach my $key (split(/\//, $method_name)) {
2207     $method = $method->{$key};
2208   }
2209   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2210 }
2211
2212 sub exit_status_s {
2213   # Given a $?, return a human-readable exit code string like "0" or
2214   # "1" or "0 with signal 1" or "1 with signal 11".
2215   my $exitcode = shift;
2216   my $s = $exitcode >> 8;
2217   if ($exitcode & 0x7f) {
2218     $s .= " with signal " . ($exitcode & 0x7f);
2219   }
2220   if ($exitcode & 0x80) {
2221     $s .= " with core dump";
2222   }
2223   return $s;
2224 }
2225
2226 sub handle_readall {
2227   # Pass in a glob reference to a file handle.
2228   # Read all its contents and return them as a string.
2229   my $fh_glob_ref = shift;
2230   local $/ = undef;
2231   return <$fh_glob_ref>;
2232 }
2233
2234 sub tar_filename_n {
2235   my $n = shift;
2236   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2237 }
2238
2239 sub add_git_archive {
2240   # Pass in a git archive command as a string or list, a la system().
2241   # This method will save its output to be included in the archive sent to the
2242   # build script.
2243   my $git_input;
2244   $git_tar_count++;
2245   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2246     croak("Failed to save git archive: $!");
2247   }
2248   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2249   close($git_input);
2250   waitpid($git_pid, 0);
2251   close(GIT_ARCHIVE);
2252   if ($?) {
2253     croak("Failed to save git archive: git exited " . exit_status_s($?));
2254   }
2255 }
2256
2257 sub combined_git_archive {
2258   # Combine all saved tar archives into a single archive, then return its
2259   # contents in a string.  Return undef if no archives have been saved.
2260   if ($git_tar_count < 1) {
2261     return undef;
2262   }
2263   my $base_tar_name = tar_filename_n(1);
2264   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2265     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2266     if ($tar_exit != 0) {
2267       croak("Error preparing build archive: tar -A exited " .
2268             exit_status_s($tar_exit));
2269     }
2270   }
2271   if (!open(GIT_TAR, "<", $base_tar_name)) {
2272     croak("Could not open build archive: $!");
2273   }
2274   my $tar_contents = handle_readall(\*GIT_TAR);
2275   close(GIT_TAR);
2276   return $tar_contents;
2277 }
2278
2279 sub set_nonblocking {
2280   my $fh = shift;
2281   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2282   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2283 }
2284
2285 __DATA__
2286 #!/usr/bin/env perl
2287 #
2288 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2289 # server invokes this script on individual compute nodes, or localhost if we're
2290 # running a job locally.  It gets called in two modes:
2291 #
2292 # * No arguments: Installation mode.  Read a tar archive from the DATA
2293 #   file handle; it includes the Crunch script's source code, and
2294 #   maybe SDKs as well.  Those should be installed in the proper
2295 #   locations.  This runs outside of any Docker container, so don't try to
2296 #   introspect Crunch's runtime environment.
2297 #
2298 # * With arguments: Crunch script run mode.  This script should set up the
2299 #   environment, then run the command specified in the arguments.  This runs
2300 #   inside any Docker container.
2301
2302 use Fcntl ':flock';
2303 use File::Path qw( make_path remove_tree );
2304 use POSIX qw(getcwd);
2305
2306 use constant TASK_TEMPFAIL => 111;
2307
2308 # Map SDK subdirectories to the path environments they belong to.
2309 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2310
2311 my $destdir = $ENV{"CRUNCH_SRC"};
2312 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2313 my $repo = $ENV{"CRUNCH_SRC_URL"};
2314 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2315 my $job_work = $ENV{"JOB_WORK"};
2316 my $task_work = $ENV{"TASK_WORK"};
2317
2318 open(STDOUT_ORIG, ">&", STDOUT);
2319 open(STDERR_ORIG, ">&", STDERR);
2320
2321 for my $dir ($destdir, $job_work, $task_work) {
2322   if ($dir) {
2323     make_path $dir;
2324     -e $dir or die "Failed to create temporary directory ($dir): $!";
2325   }
2326 }
2327
2328 if ($task_work) {
2329   remove_tree($task_work, {keep_root => 1});
2330 }
2331
2332 ### Crunch script run mode
2333 if (@ARGV) {
2334   # We want to do routine logging during task 0 only.  This gives the user
2335   # the information they need, but avoids repeating the information for every
2336   # task.
2337   my $Log;
2338   if ($ENV{TASK_SEQUENCE} eq "0") {
2339     $Log = sub {
2340       my $msg = shift;
2341       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2342     };
2343   } else {
2344     $Log = sub { };
2345   }
2346
2347   my $python_src = "$install_dir/python";
2348   my $venv_dir = "$job_work/.arvados.venv";
2349   my $venv_built = -e "$venv_dir/bin/activate";
2350   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2351     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2352                  "--python=python2.7", $venv_dir);
2353     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2354     $venv_built = 1;
2355     $Log->("Built Python SDK virtualenv");
2356   }
2357
2358   my @pysdk_version_cmd = ("python", "-c",
2359     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2360   if ($venv_built) {
2361     $Log->("Running in Python SDK virtualenv");
2362     @pysdk_version_cmd = ();
2363     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2364     @ARGV = ("/bin/sh", "-ec",
2365              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2366   } elsif (-d $python_src) {
2367     $Log->("Warning: virtualenv not found inside Docker container default " .
2368            "\$PATH. Can't install Python SDK.");
2369   }
2370
2371   if (@pysdk_version_cmd) {
2372     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2373     my $pysdk_version = <$pysdk_version_pipe>;
2374     close($pysdk_version_pipe);
2375     if ($? == 0) {
2376       chomp($pysdk_version);
2377       $Log->("Using Arvados SDK version $pysdk_version");
2378     } else {
2379       # A lot could've gone wrong here, but pretty much all of it means that
2380       # Python won't be able to load the Arvados SDK.
2381       $Log->("Warning: Arvados SDK not found");
2382     }
2383   }
2384
2385   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2386     my $sdk_path = "$install_dir/$sdk_dir";
2387     if (-d $sdk_path) {
2388       if ($ENV{$sdk_envkey}) {
2389         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2390       } else {
2391         $ENV{$sdk_envkey} = $sdk_path;
2392       }
2393       $Log->("Arvados SDK added to %s", $sdk_envkey);
2394     }
2395   }
2396
2397   exec(@ARGV);
2398   die "Cannot exec `@ARGV`: $!";
2399 }
2400
2401 ### Installation mode
2402 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2403 flock L, LOCK_EX;
2404 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2405   # This exact git archive (source + arvados sdk) is already installed
2406   # here, so there's no need to reinstall it.
2407
2408   # We must consume our DATA section, though: otherwise the process
2409   # feeding it to us will get SIGPIPE.
2410   my $buf;
2411   while (read(DATA, $buf, 65536)) { }
2412
2413   exit(0);
2414 }
2415
2416 unlink "$destdir.archive_hash";
2417 mkdir $destdir;
2418
2419 do {
2420   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2421   local $SIG{PIPE} = "IGNORE";
2422   warn "Extracting archive: $archive_hash\n";
2423   # --ignore-zeros is necessary sometimes: depending on how much NUL
2424   # padding tar -A put on our combined archive (which in turn depends
2425   # on the length of the component archives) tar without
2426   # --ignore-zeros will exit before consuming stdin and cause close()
2427   # to fail on the resulting SIGPIPE.
2428   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2429     die "Error launching 'tar -xC $destdir': $!";
2430   }
2431   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2432   # get SIGPIPE.  We must feed it data incrementally.
2433   my $tar_input;
2434   while (read(DATA, $tar_input, 65536)) {
2435     print TARX $tar_input;
2436   }
2437   if(!close(TARX)) {
2438     die "'tar -xC $destdir' exited $?: $!";
2439   }
2440 };
2441
2442 mkdir $install_dir;
2443
2444 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2445 if (-d $sdk_root) {
2446   foreach my $sdk_lang (("python",
2447                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2448     if (-d "$sdk_root/$sdk_lang") {
2449       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2450         die "Failed to install $sdk_lang SDK: $!";
2451       }
2452     }
2453   }
2454 }
2455
2456 my $python_dir = "$install_dir/python";
2457 if ((-d $python_dir) and can_run("python2.7")) {
2458   open(my $egg_info_pipe, "-|",
2459        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2460   my @egg_info_errors = <$egg_info_pipe>;
2461   close($egg_info_pipe);
2462
2463   if ($?) {
2464     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2465       # egg_info apparently failed because it couldn't ask git for a build tag.
2466       # Specify no build tag.
2467       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2468       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2469       close($pysdk_cfg);
2470     } else {
2471       my $egg_info_exit = $? >> 8;
2472       foreach my $errline (@egg_info_errors) {
2473         warn $errline;
2474       }
2475       warn "python setup.py egg_info failed: exit $egg_info_exit";
2476       exit ($egg_info_exit || 1);
2477     }
2478   }
2479 }
2480
2481 # Hide messages from the install script (unless it fails: shell_or_die
2482 # will show $destdir.log in that case).
2483 open(STDOUT, ">>", "$destdir.log");
2484 open(STDERR, ">&", STDOUT);
2485
2486 if (-e "$destdir/crunch_scripts/install") {
2487     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2488 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2489     # Old version
2490     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2491 } elsif (-e "./install.sh") {
2492     shell_or_die (undef, "./install.sh", $install_dir);
2493 }
2494
2495 if ($archive_hash) {
2496     unlink "$destdir.archive_hash.new";
2497     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2498     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2499 }
2500
2501 close L;
2502
2503 sub can_run {
2504   my $command_name = shift;
2505   open(my $which, "-|", "which", $command_name);
2506   while (<$which>) { }
2507   close($which);
2508   return ($? == 0);
2509 }
2510
2511 sub shell_or_die
2512 {
2513   my $exitcode = shift;
2514
2515   if ($ENV{"DEBUG"}) {
2516     print STDERR "@_\n";
2517   }
2518   if (system (@_) != 0) {
2519     my $err = $!;
2520     my $code = $?;
2521     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2522     open STDERR, ">&STDERR_ORIG";
2523     system ("cat $destdir.log >&2");
2524     warn "@_ failed ($err): $exitstatus";
2525     if (defined($exitcode)) {
2526       exit $exitcode;
2527     }
2528     else {
2529       exit (($code >> 8) || 1);
2530     }
2531   }
2532 }
2533
2534 __DATA__