Merge branch '12533-token-n-ip-logging'
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
7
8 =head1 NAME
9
10 crunch-job: Execute job steps, save snapshots as requested, collate output.
11
12 =head1 SYNOPSIS
13
14 Obtain job details from Arvados, run tasks on compute nodes (typically
15 invoked by scheduler on controller):
16
17  crunch-job --job x-y-z --git-dir /path/to/repo/.git
18
19 Obtain job details from command line, run tasks on local machine
20 (typically invoked by application or developer on VM):
21
22  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
23
24  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
25
26 =head1 OPTIONS
27
28 =over
29
30 =item --force-unlock
31
32 If the job is already locked, steal the lock and run it anyway.
33
34 =item --git-dir
35
36 Path to a .git directory (or a git URL) where the commit given in the
37 job's C<script_version> attribute is to be found. If this is I<not>
38 given, the job's C<repository> attribute will be used.
39
40 =item --job-api-token
41
42 Arvados API authorization token to use during the course of the job.
43
44 =item --no-clear-tmp
45
46 Do not clear per-job/task temporary directories during initial job
47 setup. This can speed up development and debugging when running jobs
48 locally.
49
50 =item --job
51
52 UUID of the job to run, or a JSON-encoded job resource without a
53 UUID. If the latter is given, a new job object will be created.
54
55 =back
56
57 =head1 RUNNING JOBS LOCALLY
58
59 crunch-job's log messages appear on stderr along with the job tasks'
60 stderr streams. The log is saved in Keep at each checkpoint and when
61 the job finishes.
62
63 If the job succeeds, the job's output locator is printed on stdout.
64
65 While the job is running, the following signals are accepted:
66
67 =over
68
69 =item control-C, SIGINT, SIGQUIT
70
71 Save a checkpoint, terminate any job tasks that are running, and stop.
72
73 =item SIGALRM
74
75 Save a checkpoint and continue.
76
77 =item SIGHUP
78
79 Refresh node allocation (i.e., check whether any nodes have been added
80 or unallocated) and attributes of the Job record that should affect
81 behavior (e.g., cancel job if cancelled_at becomes non-nil).
82
83 =back
84
85 =cut
86
87
88 use strict;
89 use POSIX ':sys_wait_h';
90 use POSIX qw(strftime);
91 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
92 use Arvados;
93 use Cwd qw(realpath);
94 use Data::Dumper;
95 use Digest::MD5 qw(md5_hex);
96 use Getopt::Long;
97 use IPC::Open2;
98 use IO::Select;
99 use File::Temp;
100 use Fcntl ':flock';
101 use File::Path qw( make_path remove_tree );
102
103 use constant TASK_TEMPFAIL => 111;
104 use constant EX_TEMPFAIL => 75;
105 use constant EX_RETRY_UNLOCKED => 93;
106
107 $ENV{"TMPDIR"} ||= "/tmp";
108 unless (defined $ENV{"CRUNCH_TMP"}) {
109   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
110   if ($ENV{"USER"} ne "crunch" && $< != 0) {
111     # use a tmp dir unique for my uid
112     $ENV{"CRUNCH_TMP"} .= "-$<";
113   }
114 }
115
116 # Create the tmp directory if it does not exist
117 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
118   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
119 }
120
121 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
122 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
123 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
124 mkdir ($ENV{"JOB_WORK"});
125
126 my %proc;
127 my $force_unlock;
128 my $git_dir;
129 my $jobspec;
130 my $job_api_token;
131 my $no_clear_tmp;
132 my $resume_stash;
133 my $cgroup_root = "/sys/fs/cgroup";
134 my $docker_bin = "docker.io";
135 my $docker_run_args = "";
136 GetOptions('force-unlock' => \$force_unlock,
137            'git-dir=s' => \$git_dir,
138            'job=s' => \$jobspec,
139            'job-api-token=s' => \$job_api_token,
140            'no-clear-tmp' => \$no_clear_tmp,
141            'resume-stash=s' => \$resume_stash,
142            'cgroup-root=s' => \$cgroup_root,
143            'docker-bin=s' => \$docker_bin,
144            'docker-run-args=s' => \$docker_run_args,
145     );
146
147 if (defined $job_api_token) {
148   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
149 }
150
151 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
152
153
154 $SIG{'USR1'} = sub
155 {
156   $main::ENV{CRUNCH_DEBUG} = 1;
157 };
158 $SIG{'USR2'} = sub
159 {
160   $main::ENV{CRUNCH_DEBUG} = 0;
161 };
162
163 my $arv = Arvados->new('apiVersion' => 'v1');
164
165 my $Job;
166 my $job_id;
167 my $dbh;
168 my $sth;
169 my @jobstep;
170
171 my $local_job;
172 if ($jobspec =~ /^[-a-z\d]+$/)
173 {
174   # $jobspec is an Arvados UUID, not a JSON job specification
175   $Job = api_call("jobs/get", uuid => $jobspec);
176   $local_job = 0;
177 }
178 else
179 {
180   $local_job = JSON::decode_json($jobspec);
181 }
182
183
184 # Make sure our workers (our slurm nodes, localhost, or whatever) are
185 # at least able to run basic commands: they aren't down or severely
186 # misconfigured.
187 my $cmd = ['true'];
188 if (($Job || $local_job)->{docker_image_locator}) {
189   $cmd = [$docker_bin, 'ps', '-q'];
190 }
191 Log(undef, "Sanity check is `@$cmd`");
192 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
193   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
194   $cmd,
195   {label => "sanity check"});
196 if ($exited != 0) {
197   Log(undef, "Sanity check failed: ".exit_status_s($exited));
198   exit EX_TEMPFAIL;
199 }
200 Log(undef, "Sanity check OK");
201
202
203 my $User = api_call("users/current");
204
205 if (!$local_job) {
206   if (!$force_unlock) {
207     # Claim this job, and make sure nobody else does
208     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
209     if ($@) {
210       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
211       exit EX_TEMPFAIL;
212     };
213   }
214 }
215 else
216 {
217   if (!$resume_stash)
218   {
219     map { croak ("No $_ specified") unless $local_job->{$_} }
220     qw(script script_version script_parameters);
221   }
222
223   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
224   $local_job->{'started_at'} = gmtime;
225   $local_job->{'state'} = 'Running';
226
227   $Job = api_call("jobs/create", job => $local_job);
228 }
229 $job_id = $Job->{'uuid'};
230
231 my $keep_logfile = $job_id . '.log.txt';
232 log_writer_start($keep_logfile);
233
234 $Job->{'runtime_constraints'} ||= {};
235 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
236 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
237
238 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
239 if ($? == 0) {
240   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
241   chomp($gem_versions);
242   chop($gem_versions);  # Closing parentheses
243 } else {
244   $gem_versions = "";
245 }
246 Log(undef,
247     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
248
249 Log (undef, "check slurm allocation");
250 my @slot;
251 my @node;
252 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
253 my @sinfo;
254 if (!$have_slurm)
255 {
256   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
257   push @sinfo, "$localcpus localhost";
258 }
259 if (exists $ENV{SLURM_NODELIST})
260 {
261   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
262 }
263 foreach (@sinfo)
264 {
265   my ($ncpus, $slurm_nodelist) = split;
266   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
267
268   my @nodelist;
269   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
270   {
271     my $nodelist = $1;
272     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
273     {
274       my $ranges = $1;
275       foreach (split (",", $ranges))
276       {
277         my ($a, $b);
278         if (/(\d+)-(\d+)/)
279         {
280           $a = $1;
281           $b = $2;
282         }
283         else
284         {
285           $a = $_;
286           $b = $_;
287         }
288         push @nodelist, map {
289           my $n = $nodelist;
290           $n =~ s/\[[-,\d]+\]/$_/;
291           $n;
292         } ($a..$b);
293       }
294     }
295     else
296     {
297       push @nodelist, $nodelist;
298     }
299   }
300   foreach my $nodename (@nodelist)
301   {
302     Log (undef, "node $nodename - $ncpus slots");
303     my $node = { name => $nodename,
304                  ncpus => $ncpus,
305                  # The number of consecutive times a task has been dispatched
306                  # to this node and failed.
307                  losing_streak => 0,
308                  # The number of consecutive times that SLURM has reported
309                  # a node failure since the last successful task.
310                  fail_count => 0,
311                  # Don't dispatch work to this node until this time
312                  # (in seconds since the epoch) has passed.
313                  hold_until => 0 };
314     foreach my $cpu (1..$ncpus)
315     {
316       push @slot, { node => $node,
317                     cpu => $cpu };
318     }
319   }
320   push @node, @nodelist;
321 }
322
323
324
325 # Ensure that we get one jobstep running on each allocated node before
326 # we start overloading nodes with concurrent steps
327
328 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
329
330
331 $Job->update_attributes(
332   'tasks_summary' => { 'failed' => 0,
333                        'todo' => 1,
334                        'running' => 0,
335                        'done' => 0 });
336
337 Log (undef, "start");
338 $SIG{'INT'} = sub { $main::please_freeze = 1; };
339 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
340 $SIG{'TERM'} = \&croak;
341 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
342 $SIG{'ALRM'} = sub { $main::please_info = 1; };
343 $SIG{'CONT'} = sub { $main::please_continue = 1; };
344 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
345
346 $main::please_freeze = 0;
347 $main::please_info = 0;
348 $main::please_continue = 0;
349 $main::please_refresh = 0;
350 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
351
352 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
353 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
354 $ENV{"JOB_UUID"} = $job_id;
355
356
357 my @jobstep_todo = ();
358 my @jobstep_done = ();
359 my @jobstep_tomerge = ();
360 my $jobstep_tomerge_level = 0;
361 my $squeue_checked = 0;
362 my $sinfo_checked = 0;
363 my $latest_refresh = scalar time;
364
365
366
367 if (defined $Job->{thawedfromkey})
368 {
369   thaw ($Job->{thawedfromkey});
370 }
371 else
372 {
373   my $first_task = api_call("job_tasks/create", job_task => {
374     'job_uuid' => $Job->{'uuid'},
375     'sequence' => 0,
376     'qsequence' => 0,
377     'parameters' => {},
378   });
379   push @jobstep, { 'level' => 0,
380                    'failures' => 0,
381                    'arvados_task' => $first_task,
382                  };
383   push @jobstep_todo, 0;
384 }
385
386
387 if (!$have_slurm)
388 {
389   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
390 }
391
392 my $build_script = handle_readall(\*DATA);
393 my $nodelist = join(",", @node);
394 my $git_tar_count = 0;
395
396 if (!defined $no_clear_tmp) {
397   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
398   # up work directories crunch_tmp/work, crunch_tmp/opt,
399   # crunch_tmp/src*.
400   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
401     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
402     ['bash', '-ec', q{
403 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
404 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
405     }],
406     {label => "clean work dirs"});
407   if ($exited != 0) {
408     exit_retry_unlocked();
409   }
410 }
411
412 # If this job requires a Docker image, install that.
413 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
414 if ($docker_locator = $Job->{docker_image_locator}) {
415   Log (undef, "Install docker image $docker_locator");
416   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
417   if (!$docker_hash)
418   {
419     croak("No Docker image hash found from locator $docker_locator");
420   }
421   Log (undef, "docker image hash is $docker_hash");
422   $docker_stream =~ s/^\.//;
423   my $docker_install_script = qq{
424 loaded() {
425   id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
426   echo "image ID is \$id"
427   [[ \${id} = \Q$docker_hash\E ]]
428 }
429 if loaded >&2 2>/dev/null; then
430   echo >&2 "image is already present"
431   exit 0
432 fi
433 echo >&2 "docker image is not present; loading"
434 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
435 if ! loaded >&2; then
436   echo >&2 "`docker load` exited 0, but image is not found (!)"
437   exit 1
438 fi
439 echo >&2 "image loaded successfully"
440 };
441
442   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
443     ["srun", "--nodelist=" . join(',', @node)],
444     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
445     {label => "load docker image"});
446   if ($exited != 0)
447   {
448     exit_retry_unlocked();
449   }
450
451   # Determine whether this version of Docker supports memory+swap limits.
452   ($exited, $stdout, $stderr, $tempfail) = srun_sync(
453     ["srun", "--nodes=1"],
454     [$docker_bin, 'run', '--help'],
455     {label => "check --memory-swap feature"});
456   if ($tempfail) {
457     exit_retry_unlocked();
458   }
459   $docker_limitmem = ($stdout =~ /--memory-swap/);
460
461   # Find a non-root Docker user to use.
462   # Tries the default user for the container, then 'crunch', then 'nobody',
463   # testing for whether the actual user id is non-zero.  This defends against
464   # mistakes but not malice, but we intend to harden the security in the future
465   # so we don't want anyone getting used to their jobs running as root in their
466   # Docker containers.
467   my @tryusers = ("", "crunch", "nobody");
468   foreach my $try_user (@tryusers) {
469     my $label;
470     my $try_user_arg;
471     if ($try_user eq "") {
472       $label = "check whether default user is UID 0";
473       $try_user_arg = "";
474     } else {
475       $label = "check whether user '$try_user' is UID 0";
476       $try_user_arg = "--user=$try_user";
477     }
478     my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
479       ["srun", "--nodes=1"],
480       ["/bin/sh", "-ec",
481        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
482       {label => $label});
483     chomp($stdout);
484     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
485       $dockeruserarg = $try_user_arg;
486       if ($try_user eq "") {
487         Log(undef, "Container will run with default user");
488       } else {
489         Log(undef, "Container will run with $dockeruserarg");
490       }
491       last;
492     } elsif ($tempfail) {
493       exit_retry_unlocked();
494     }
495   }
496
497   if (!defined $dockeruserarg) {
498     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
499   }
500
501   if ($Job->{arvados_sdk_version}) {
502     # The job also specifies an Arvados SDK version.  Add the SDKs to the
503     # tar file for the build script to install.
504     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
505                        $Job->{arvados_sdk_version}));
506     add_git_archive("git", "--git-dir=$git_dir", "archive",
507                     "--prefix=.arvados.sdk/",
508                     $Job->{arvados_sdk_version}, "sdk");
509   }
510 }
511
512 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
513   # If script_version looks like an absolute path, *and* the --git-dir
514   # argument was not given -- which implies we were not invoked by
515   # crunch-dispatch -- we will use the given path as a working
516   # directory instead of resolving script_version to a git commit (or
517   # doing anything else with git).
518   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
519   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
520 }
521 else {
522   # Resolve the given script_version to a git commit sha1. Also, if
523   # the repository is remote, clone it into our local filesystem: this
524   # ensures "git archive" will work, and is necessary to reliably
525   # resolve a symbolic script_version like "master^".
526   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
527
528   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
529
530   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
531
532   # If we're running under crunch-dispatch, it will have already
533   # pulled the appropriate source tree into its own repository, and
534   # given us that repo's path as $git_dir.
535   #
536   # If we're running a "local" job, we might have to fetch content
537   # from a remote repository.
538   #
539   # (Currently crunch-dispatch gives a local path with --git-dir, but
540   # we might as well accept URLs there too in case it changes its
541   # mind.)
542   my $repo = $git_dir || $Job->{'repository'};
543
544   # Repository can be remote or local. If remote, we'll need to fetch it
545   # to a local dir before doing `git log` et al.
546   my $repo_location;
547
548   if ($repo =~ m{://|^[^/]*:}) {
549     # $repo is a git url we can clone, like git:// or https:// or
550     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
551     # not recognized here because distinguishing that from a local
552     # path is too fragile. If you really need something strange here,
553     # use the ssh:// form.
554     $repo_location = 'remote';
555   } elsif ($repo =~ m{^\.*/}) {
556     # $repo is a local path to a git index. We'll also resolve ../foo
557     # to ../foo/.git if the latter is a directory. To help
558     # disambiguate local paths from named hosted repositories, this
559     # form must be given as ./ or ../ if it's a relative path.
560     if (-d "$repo/.git") {
561       $repo = "$repo/.git";
562     }
563     $repo_location = 'local';
564   } else {
565     # $repo is none of the above. It must be the name of a hosted
566     # repository.
567     my $arv_repo_list = api_call("repositories/list",
568                                  'filters' => [['name','=',$repo]]);
569     my @repos_found = @{$arv_repo_list->{'items'}};
570     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
571     if ($n_found > 0) {
572       Log(undef, "Repository '$repo' -> "
573           . join(", ", map { $_->{'uuid'} } @repos_found));
574     }
575     if ($n_found != 1) {
576       croak("Error: Found $n_found repositories with name '$repo'.");
577     }
578     $repo = $repos_found[0]->{'fetch_url'};
579     $repo_location = 'remote';
580   }
581   Log(undef, "Using $repo_location repository '$repo'");
582   $ENV{"CRUNCH_SRC_URL"} = $repo;
583
584   # Resolve given script_version (we'll call that $treeish here) to a
585   # commit sha1 ($commit).
586   my $treeish = $Job->{'script_version'};
587   my $commit;
588   if ($repo_location eq 'remote') {
589     # We minimize excess object-fetching by re-using the same bare
590     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
591     # just keep adding remotes to it as needed.
592     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
593     my $gitcmd = "git --git-dir=\Q$local_repo\E";
594
595     # Set up our local repo for caching remote objects, making
596     # archives, etc.
597     if (!-d $local_repo) {
598       make_path($local_repo) or croak("Error: could not create $local_repo");
599     }
600     # This works (exits 0 and doesn't delete fetched objects) even
601     # if $local_repo is already initialized:
602     `$gitcmd init --bare`;
603     if ($?) {
604       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
605     }
606
607     # If $treeish looks like a hash (or abbrev hash) we look it up in
608     # our local cache first, since that's cheaper. (We don't want to
609     # do that with tags/branches though -- those change over time, so
610     # they should always be resolved by the remote repo.)
611     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
612       # Hide stderr because it's normal for this to fail:
613       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
614       if ($? == 0 &&
615           # Careful not to resolve a branch named abcdeff to commit 1234567:
616           $sha1 =~ /^$treeish/ &&
617           $sha1 =~ /^([0-9a-f]{40})$/s) {
618         $commit = $1;
619         Log(undef, "Commit $commit already present in $local_repo");
620       }
621     }
622
623     if (!defined $commit) {
624       # If $treeish isn't just a hash or abbrev hash, or isn't here
625       # yet, we need to fetch the remote to resolve it correctly.
626
627       # First, remove all local heads. This prevents a name that does
628       # not exist on the remote from resolving to (or colliding with)
629       # a previously fetched branch or tag (possibly from a different
630       # remote).
631       remove_tree("$local_repo/refs/heads", {keep_root => 1});
632
633       Log(undef, "Fetching objects from $repo to $local_repo");
634       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
635       if ($?) {
636         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
637       }
638     }
639
640     # Now that the data is all here, we will use our local repo for
641     # the rest of our git activities.
642     $repo = $local_repo;
643   }
644
645   my $gitcmd = "git --git-dir=\Q$repo\E";
646   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
647   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
648     croak("`$gitcmd rev-list` exited "
649           .exit_status_s($?)
650           .", '$treeish' not found, giving up");
651   }
652   $commit = $1;
653   Log(undef, "Version $treeish is commit $commit");
654
655   if ($commit ne $Job->{'script_version'}) {
656     # Record the real commit id in the database, frozentokey, logs,
657     # etc. -- instead of an abbreviation or a branch name which can
658     # become ambiguous or point to a different commit in the future.
659     if (!$Job->update_attributes('script_version' => $commit)) {
660       croak("Error: failed to update job's script_version attribute");
661     }
662   }
663
664   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
665   add_git_archive("$gitcmd archive ''\Q$commit\E");
666 }
667
668 my $git_archive = combined_git_archive();
669 if (!defined $git_archive) {
670   Log(undef, "Skip install phase (no git archive)");
671   if ($have_slurm) {
672     Log(undef, "Warning: This probably means workers have no source tree!");
673   }
674 }
675 else {
676   my $exited;
677   my $install_script_tries_left = 3;
678   for (my $attempts = 0; $attempts < 3; $attempts++) {
679     my @srunargs = ("srun",
680                     "--nodelist=$nodelist",
681                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
682     my @execargs = ("sh", "-c",
683                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
684
685     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
686     my ($stdout, $stderr, $tempfail);
687     ($exited, $stdout, $stderr, $tempfail) = srun_sync(
688       \@srunargs, \@execargs,
689       {label => "run install script on all workers"},
690         $build_script . $git_archive);
691     if ($tempfail) {
692       exit_retry_unlocked();
693     }
694
695     my $stderr_anything_from_script = 0;
696     for my $line (split(/\n/, $stderr)) {
697       if ($line !~ /^(srun: error: |starting: \[)/) {
698         $stderr_anything_from_script = 1;
699       }
700     }
701
702     last if $exited == 0 || $main::please_freeze;
703
704     # If the install script fails but doesn't print an error message,
705     # the next thing anyone is likely to do is just run it again in
706     # case it was a transient problem like "slurm communication fails
707     # because the network isn't reliable enough". So we'll just do
708     # that ourselves (up to 3 attempts in total). OTOH, if there is an
709     # error message, the problem is more likely to have a real fix and
710     # we should fail the job so the fixing process can start, instead
711     # of doing 2 more attempts.
712     last if $stderr_anything_from_script;
713   }
714
715   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
716     unlink($tar_filename);
717   }
718
719   if ($exited != 0) {
720     croak("Giving up");
721   }
722 }
723
724 foreach (qw (script script_version script_parameters runtime_constraints))
725 {
726   Log (undef,
727        "$_ " .
728        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
729 }
730 foreach (split (/\n/, $Job->{knobs}))
731 {
732   Log (undef, "knob " . $_);
733 }
734 my $resp = api_call(
735   'nodes/list',
736   'filters' => [['hostname', 'in', \@node]],
737   'order' => 'hostname',
738   'limit' => scalar(@node),
739     );
740 for my $n (@{$resp->{items}}) {
741   Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
742 }
743
744
745
746 $main::success = undef;
747
748
749
750 ONELEVEL:
751
752 my $thisround_succeeded = 0;
753 my $thisround_failed = 0;
754 my $thisround_failed_multiple = 0;
755 my $working_slot_count = scalar(@slot);
756
757 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
758                        or $a <=> $b } @jobstep_todo;
759 my $level = $jobstep[$jobstep_todo[0]]->{level};
760
761 my $initial_tasks_this_level = 0;
762 foreach my $id (@jobstep_todo) {
763   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
764 }
765
766 # If the number of tasks scheduled at this level #T is smaller than the number
767 # of slots available #S, only use the first #T slots, or the first slot on
768 # each node, whichever number is greater.
769 #
770 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
771 # based on these numbers.  Using fewer slots makes more resources available
772 # to each individual task, which should normally be a better strategy when
773 # there are fewer of them running with less parallelism.
774 #
775 # Note that this calculation is not redone if the initial tasks at
776 # this level queue more tasks at the same level.  This may harm
777 # overall task throughput for that level.
778 my @freeslot;
779 if ($initial_tasks_this_level < @node) {
780   @freeslot = (0..$#node);
781 } elsif ($initial_tasks_this_level < @slot) {
782   @freeslot = (0..$initial_tasks_this_level - 1);
783 } else {
784   @freeslot = (0..$#slot);
785 }
786 my $round_num_freeslots = scalar(@freeslot);
787 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
788
789 my %round_max_slots = ();
790 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
791   my $this_slot = $slot[$freeslot[$ii]];
792   my $node_name = $this_slot->{node}->{name};
793   $round_max_slots{$node_name} ||= $this_slot->{cpu};
794   last if (scalar(keys(%round_max_slots)) >= @node);
795 }
796
797 Log(undef, "start level $level with $round_num_freeslots slots");
798 my @holdslot;
799 my %reader;
800 my $progress_is_dirty = 1;
801 my $progress_stats_updated = 0;
802
803 update_progress_stats();
804
805
806 THISROUND:
807 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
808 {
809   # Don't create new tasks if we already know the job's final result.
810   last if defined($main::success);
811
812   my $id = $jobstep_todo[$todo_ptr];
813   my $Jobstep = $jobstep[$id];
814   if ($Jobstep->{level} != $level)
815   {
816     next;
817   }
818
819   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
820   set_nonblocking($reader{$id});
821
822   my $childslot = $freeslot[0];
823   my $childnode = $slot[$childslot]->{node};
824   my $childslotname = join (".",
825                             $slot[$childslot]->{node}->{name},
826                             $slot[$childslot]->{cpu});
827
828   my $childpid = fork();
829   if ($childpid == 0)
830   {
831     $SIG{'INT'} = 'DEFAULT';
832     $SIG{'QUIT'} = 'DEFAULT';
833     $SIG{'TERM'} = 'DEFAULT';
834
835     foreach (values (%reader))
836     {
837       close($_);
838     }
839     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
840     open(STDOUT,">&writer") or croak ($!);
841     open(STDERR,">&writer") or croak ($!);
842
843     undef $dbh;
844     undef $sth;
845
846     delete $ENV{"GNUPGHOME"};
847     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
848     $ENV{"TASK_QSEQUENCE"} = $id;
849     $ENV{"TASK_SEQUENCE"} = $level;
850     $ENV{"JOB_SCRIPT"} = $Job->{script};
851     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
852       $param =~ tr/a-z/A-Z/;
853       $ENV{"JOB_PARAMETER_$param"} = $value;
854     }
855     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
856     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
857     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
858     $ENV{"HOME"} = $ENV{"TASK_WORK"};
859     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
860     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
861     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
862
863     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
864
865     $ENV{"GZIP"} = "-n";
866
867     my @srunargs = (
868       "srun",
869       "--nodelist=".$childnode->{name},
870       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
871       "--job-name=$job_id.$id.$$",
872         );
873
874     my $stdbuf = " stdbuf --output=0 --error=0 ";
875
876     my $arv_file_cache = "";
877     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
878       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
879     }
880
881     my $command =
882         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
883         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
884         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
885         # These environment variables get used explicitly later in
886         # $command.  No tool is expected to read these values directly.
887         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
888         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
889         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
890         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
891         .q{&& declare -a VOLUMES=() }
892         .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
893         .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
894         .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
895
896     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
897     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
898     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
899
900     if ($docker_hash)
901     {
902       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
903       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
904       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
905       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
906       # We only set memory limits if Docker lets us limit both memory and swap.
907       # Memory limits alone have been supported longer, but subprocesses tend
908       # to get SIGKILL if they exceed that without any swap limit set.
909       # See #5642 for additional background.
910       if ($docker_limitmem) {
911         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
912       }
913
914       # The source tree and $destdir directory (which we have
915       # installed on the worker host) are available in the container,
916       # under the same path.
917       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
918       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
919
920       # Currently, we make the "by_pdh" directory in arv-mount's mount
921       # point appear at /keep inside the container (instead of using
922       # the same path as the host like we do with CRUNCH_SRC and
923       # CRUNCH_INSTALL). However, crunch scripts and utilities must
924       # not rely on this. They must use $TASK_KEEPMOUNT.
925       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
926       $ENV{TASK_KEEPMOUNT} = "/keep";
927
928       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
929       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
930       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
931
932       # TASK_WORK is almost exactly like a docker data volume: it
933       # starts out empty, is writable, and persists until no
934       # containers use it any more. We don't use --volumes-from to
935       # share it with other containers: it is only accessible to this
936       # task, and it goes away when this task stops.
937       #
938       # However, a docker data volume is writable only by root unless
939       # the mount point already happens to exist in the container with
940       # different permissions. Therefore, we [1] assume /tmp already
941       # exists in the image and is writable by the crunch user; [2]
942       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
943       # writable if they are created by docker while setting up the
944       # other --volumes); and [3] create $TASK_WORK inside the
945       # container using $build_script.
946       $command .= "--volume=/tmp ";
947       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
948       $ENV{"HOME"} = $ENV{"TASK_WORK"};
949       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
950
951       # TODO: Share a single JOB_WORK volume across all task
952       # containers on a given worker node, and delete it when the job
953       # ends (and, in case that doesn't work, when the next job
954       # starts).
955       #
956       # For now, use the same approach as TASK_WORK above.
957       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
958
959       # Bind mount the crunchrunner binary and host TLS certificates file into
960       # the container.
961       $command .= '"${VOLUMES[@]}" ';
962
963       while (my ($env_key, $env_val) = each %ENV)
964       {
965         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
966           $command .= "--env=\Q$env_key=$env_val\E ";
967         }
968       }
969       $command .= "--env=\QHOME=$ENV{HOME}\E ";
970       $command .= "\Q$docker_hash\E ";
971
972       if ($Job->{arvados_sdk_version}) {
973         $command .= $stdbuf;
974         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
975       } else {
976         $command .= "/bin/sh -c \'python -c " .
977             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
978             ">&2 2>/dev/null; " .
979             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
980             "if which stdbuf >/dev/null ; then " .
981             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
982             " else " .
983             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
984             " fi\'";
985       }
986     } else {
987       # Non-docker run
988       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
989       $command .= $stdbuf;
990       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
991     }
992
993     my @execargs = ('bash', '-c', $command);
994     srun (\@srunargs, \@execargs, undef, $build_script);
995     # exec() failed, we assume nothing happened.
996     die "srun() failed on build script\n";
997   }
998   close("writer");
999   if (!defined $childpid)
1000   {
1001     close $reader{$id};
1002     delete $reader{$id};
1003     next;
1004   }
1005   shift @freeslot;
1006   $proc{$childpid} = {
1007     jobstepidx => $id,
1008     time => time,
1009     slot => $childslot,
1010     jobstepname => "$job_id.$id.$childpid",
1011   };
1012   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1013   $slot[$childslot]->{pid} = $childpid;
1014
1015   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1016   Log ($id, "child $childpid started on $childslotname");
1017   $Jobstep->{starttime} = time;
1018   $Jobstep->{node} = $childnode->{name};
1019   $Jobstep->{slotindex} = $childslot;
1020   delete $Jobstep->{stderr};
1021   delete $Jobstep->{finishtime};
1022   delete $Jobstep->{tempfail};
1023
1024   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1025   retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1026
1027   splice @jobstep_todo, $todo_ptr, 1;
1028   --$todo_ptr;
1029
1030   $progress_is_dirty = 1;
1031
1032   while (!@freeslot
1033          ||
1034          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1035   {
1036     last THISROUND if $main::please_freeze;
1037     if ($main::please_info)
1038     {
1039       $main::please_info = 0;
1040       freeze();
1041       create_output_collection();
1042       save_meta(1);
1043       update_progress_stats();
1044     }
1045     my $gotsome
1046         = readfrompipes ()
1047         + reapchildren ();
1048     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1049     {
1050       check_refresh_wanted();
1051       check_squeue();
1052       update_progress_stats();
1053     }
1054     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1055     {
1056       update_progress_stats();
1057     }
1058     if (!$gotsome) {
1059       select (undef, undef, undef, 0.1);
1060     }
1061     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1062                                         $_->{node}->{hold_count} < 4 } @slot);
1063     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1064         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1065     {
1066       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1067           .($thisround_failed+$thisround_succeeded)
1068           .") -- giving up on this round";
1069       Log (undef, $message);
1070       last THISROUND;
1071     }
1072
1073     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1074     for (my $i=$#freeslot; $i>=0; $i--) {
1075       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1076         push @holdslot, (splice @freeslot, $i, 1);
1077       }
1078     }
1079     for (my $i=$#holdslot; $i>=0; $i--) {
1080       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1081         push @freeslot, (splice @holdslot, $i, 1);
1082       }
1083     }
1084
1085     # give up if no nodes are succeeding
1086     if ($working_slot_count < 1) {
1087       Log(undef, "Every node has failed -- giving up");
1088       last THISROUND;
1089     }
1090   }
1091 }
1092
1093
1094 push @freeslot, splice @holdslot;
1095 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1096
1097
1098 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1099 while (%proc)
1100 {
1101   if ($main::please_continue) {
1102     $main::please_continue = 0;
1103     goto THISROUND;
1104   }
1105   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1106   readfrompipes ();
1107   if (!reapchildren())
1108   {
1109     check_refresh_wanted();
1110     check_squeue();
1111     update_progress_stats();
1112     select (undef, undef, undef, 0.1);
1113     killem (keys %proc) if $main::please_freeze;
1114   }
1115 }
1116
1117 update_progress_stats();
1118 freeze_if_want_freeze();
1119
1120
1121 if (!defined $main::success)
1122 {
1123   if (!@jobstep_todo) {
1124     $main::success = 1;
1125   } elsif ($working_slot_count < 1) {
1126     save_output_collection();
1127     save_meta();
1128     exit_retry_unlocked();
1129   } elsif ($thisround_succeeded == 0 &&
1130            ($thisround_failed == 0 || $thisround_failed > 4)) {
1131     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1132     Log (undef, $message);
1133     $main::success = 0;
1134   }
1135 }
1136
1137 goto ONELEVEL if !defined $main::success;
1138
1139
1140 release_allocation();
1141 freeze();
1142 my $collated_output = save_output_collection();
1143 Log (undef, "finish");
1144
1145 my $final_log = save_meta();
1146
1147 my $final_state;
1148 if ($collated_output && $final_log && $main::success) {
1149   $final_state = 'Complete';
1150 } else {
1151   $final_state = 'Failed';
1152 }
1153 $Job->update_attributes('state' => $final_state);
1154
1155 exit (($final_state eq 'Complete') ? 0 : 1);
1156
1157
1158
1159 sub update_progress_stats
1160 {
1161   $progress_stats_updated = time;
1162   return if !$progress_is_dirty;
1163   my ($todo, $done, $running) = (scalar @jobstep_todo,
1164                                  scalar @jobstep_done,
1165                                  scalar keys(%proc));
1166   $Job->{'tasks_summary'} ||= {};
1167   $Job->{'tasks_summary'}->{'todo'} = $todo;
1168   $Job->{'tasks_summary'}->{'done'} = $done;
1169   $Job->{'tasks_summary'}->{'running'} = $running;
1170   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1171   Log (undef, "status: $done done, $running running, $todo todo");
1172   $progress_is_dirty = 0;
1173 }
1174
1175
1176
1177 sub reapchildren
1178 {
1179   my $children_reaped = 0;
1180   my @successful_task_uuids = ();
1181
1182   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1183   {
1184     my $childstatus = $?;
1185
1186     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1187                     . "."
1188                     . $slot[$proc{$pid}->{slot}]->{cpu});
1189     my $jobstepidx = $proc{$pid}->{jobstepidx};
1190
1191     $children_reaped++;
1192     my $elapsed = time - $proc{$pid}->{time};
1193     my $Jobstep = $jobstep[$jobstepidx];
1194
1195     my $exitvalue = $childstatus >> 8;
1196     my $exitinfo = "exit ".exit_status_s($childstatus);
1197     $Jobstep->{'arvados_task'}->reload;
1198     my $task_success = $Jobstep->{'arvados_task'}->{success};
1199
1200     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1201
1202     if (!defined $task_success) {
1203       # task did not indicate one way or the other --> fail
1204       Log($jobstepidx, sprintf(
1205             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1206             exit_status_s($childstatus)));
1207       $Jobstep->{'arvados_task'}->{success} = 0;
1208       retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1209       $task_success = 0;
1210     }
1211
1212     if (!$task_success)
1213     {
1214       my $temporary_fail;
1215       $temporary_fail ||= $Jobstep->{tempfail};
1216       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1217
1218       ++$thisround_failed;
1219       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1220
1221       # Check for signs of a failed or misconfigured node
1222       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1223           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1224         # Don't count this against jobstep failure thresholds if this
1225         # node is already suspected faulty and srun exited quickly
1226         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1227             $elapsed < 5) {
1228           Log ($jobstepidx, "blaming failure on suspect node " .
1229                $slot[$proc{$pid}->{slot}]->{node}->{name});
1230           $temporary_fail ||= 1;
1231         }
1232         ban_node_by_slot($proc{$pid}->{slot});
1233       }
1234
1235       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1236                                 ++$Jobstep->{'failures'},
1237                                 $temporary_fail ? 'temporary' : 'permanent',
1238                                 $elapsed));
1239
1240       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1241         # Give up on this task, and the whole job
1242         $main::success = 0;
1243       }
1244       # Put this task back on the todo queue
1245       push @jobstep_todo, $jobstepidx;
1246       $Job->{'tasks_summary'}->{'failed'}++;
1247     }
1248     else # task_success
1249     {
1250       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1251       ++$thisround_succeeded;
1252       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1253       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1254       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1255       push @jobstep_done, $jobstepidx;
1256       Log ($jobstepidx, "success in $elapsed seconds");
1257     }
1258     $Jobstep->{exitcode} = $childstatus;
1259     $Jobstep->{finishtime} = time;
1260     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1261     retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1262     process_stderr_final ($jobstepidx);
1263     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1264                               length($Jobstep->{'arvados_task'}->{output}),
1265                               $Jobstep->{'arvados_task'}->{output}));
1266
1267     close $reader{$jobstepidx};
1268     delete $reader{$jobstepidx};
1269     delete $slot[$proc{$pid}->{slot}]->{pid};
1270     push @freeslot, $proc{$pid}->{slot};
1271     delete $proc{$pid};
1272
1273     $progress_is_dirty = 1;
1274   }
1275
1276   if (scalar(@successful_task_uuids) > 0)
1277   {
1278     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1279     # Load new tasks
1280     my $newtask_list = [];
1281     my $newtask_results;
1282     do {
1283       $newtask_results = api_call(
1284         "job_tasks/list",
1285         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1286         'order' => 'qsequence',
1287         'offset' => scalar(@$newtask_list),
1288           );
1289       push(@$newtask_list, @{$newtask_results->{items}});
1290     } while (@{$newtask_results->{items}});
1291     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1292     foreach my $arvados_task (@$newtask_list) {
1293       my $jobstep = {
1294         'level' => $arvados_task->{'sequence'},
1295         'failures' => 0,
1296         'arvados_task' => $arvados_task
1297       };
1298       push @jobstep, $jobstep;
1299       push @jobstep_todo, $#jobstep;
1300     }
1301   }
1302
1303   return $children_reaped;
1304 }
1305
1306 sub check_refresh_wanted
1307 {
1308   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1309   if (@stat &&
1310       $stat[9] > $latest_refresh &&
1311       # ...and we have actually locked the job record...
1312       $job_id eq $Job->{'uuid'}) {
1313     $latest_refresh = scalar time;
1314     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1315     for my $attr ('cancelled_at',
1316                   'cancelled_by_user_uuid',
1317                   'cancelled_by_client_uuid',
1318                   'state') {
1319       $Job->{$attr} = $Job2->{$attr};
1320     }
1321     if ($Job->{'state'} ne "Running") {
1322       if ($Job->{'state'} eq "Cancelled") {
1323         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1324       } else {
1325         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1326       }
1327       $main::success = 0;
1328       $main::please_freeze = 1;
1329     }
1330   }
1331 }
1332
1333 sub check_squeue
1334 {
1335   my $last_squeue_check = $squeue_checked;
1336
1337   # Do not call `squeue` or check the kill list more than once every
1338   # 15 seconds.
1339   return if $last_squeue_check > time - 15;
1340   $squeue_checked = time;
1341
1342   # Look for children from which we haven't received stderr data since
1343   # the last squeue check. If no such children exist, all procs are
1344   # alive and there's no need to even look at squeue.
1345   #
1346   # As long as the crunchstat poll interval (10s) is shorter than the
1347   # squeue check interval (15s) this should make the squeue check an
1348   # infrequent event.
1349   my $silent_procs = 0;
1350   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1351   {
1352     if (!exists($js->{stderr_at}))
1353     {
1354       $js->{stderr_at} = 0;
1355     }
1356     if ($js->{stderr_at} < $last_squeue_check)
1357     {
1358       $silent_procs++;
1359     }
1360   }
1361   return if $silent_procs == 0;
1362
1363   # use killem() on procs whose killtime is reached
1364   while (my ($pid, $procinfo) = each %proc)
1365   {
1366     my $js = $jobstep[$procinfo->{jobstepidx}];
1367     if (exists $procinfo->{killtime}
1368         && $procinfo->{killtime} <= time
1369         && $js->{stderr_at} < $last_squeue_check)
1370     {
1371       my $sincewhen = "";
1372       if ($js->{stderr_at}) {
1373         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1374       }
1375       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1376       killem ($pid);
1377     }
1378   }
1379
1380   if (!$have_slurm)
1381   {
1382     # here is an opportunity to check for mysterious problems with local procs
1383     return;
1384   }
1385
1386   # Get a list of steps still running.  Note: squeue(1) says --steps
1387   # selects a format (which we override anyway) and allows us to
1388   # specify which steps we're interested in (which we don't).
1389   # Importantly, it also changes the meaning of %j from "job name" to
1390   # "step name" and (although this isn't mentioned explicitly in the
1391   # docs) switches from "one line per job" mode to "one line per step"
1392   # mode. Without it, we'd just get a list of one job, instead of a
1393   # list of N steps.
1394   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1395   if ($? != 0)
1396   {
1397     Log(undef, "warning: squeue exit status $? ($!)");
1398     return;
1399   }
1400   chop @squeue;
1401
1402   # which of my jobsteps are running, according to squeue?
1403   my %ok;
1404   for my $jobstepname (@squeue)
1405   {
1406     $ok{$jobstepname} = 1;
1407   }
1408
1409   # Check for child procs >60s old and not mentioned by squeue.
1410   while (my ($pid, $procinfo) = each %proc)
1411   {
1412     if ($procinfo->{time} < time - 60
1413         && $procinfo->{jobstepname}
1414         && !exists $ok{$procinfo->{jobstepname}}
1415         && !exists $procinfo->{killtime})
1416     {
1417       # According to slurm, this task has ended (successfully or not)
1418       # -- but our srun child hasn't exited. First we must wait (30
1419       # seconds) in case this is just a race between communication
1420       # channels. Then, if our srun child process still hasn't
1421       # terminated, we'll conclude some slurm communication
1422       # error/delay has caused the task to die without notifying srun,
1423       # and we'll kill srun ourselves.
1424       $procinfo->{killtime} = time + 30;
1425       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1426     }
1427   }
1428 }
1429
1430 sub check_sinfo
1431 {
1432   # If a node fails in a multi-node "srun" call during job setup, the call
1433   # may hang instead of exiting with a nonzero code.  This function checks
1434   # "sinfo" for the health of the nodes that were allocated and ensures that
1435   # they are all still in the "alloc" state.  If a node that is allocated to
1436   # this job is not in "alloc" state, then set please_freeze.
1437   #
1438   # This is only called from srun_sync() for node configuration.  If a
1439   # node fails doing actual work, there are other recovery mechanisms.
1440
1441   # Do not call `sinfo` more than once every 15 seconds.
1442   return if $sinfo_checked > time - 15;
1443   $sinfo_checked = time;
1444
1445   # The output format "%t" means output node states.
1446   my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1447   if ($? != 0)
1448   {
1449     Log(undef, "warning: sinfo exit status $? ($!)");
1450     return;
1451   }
1452   chop @sinfo;
1453
1454   foreach (@sinfo)
1455   {
1456     if ($_ != "alloc" && $_ != "alloc*") {
1457       $main::please_freeze = 1;
1458     }
1459   }
1460 }
1461
1462 sub release_allocation
1463 {
1464   if ($have_slurm)
1465   {
1466     Log (undef, "release job allocation");
1467     system "scancel $ENV{SLURM_JOB_ID}";
1468   }
1469 }
1470
1471
1472 sub readfrompipes
1473 {
1474   my $gotsome = 0;
1475   my %fd_job;
1476   my $sel = IO::Select->new();
1477   foreach my $jobstepidx (keys %reader)
1478   {
1479     my $fd = $reader{$jobstepidx};
1480     $sel->add($fd);
1481     $fd_job{$fd} = $jobstepidx;
1482
1483     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1484       $sel->add($stdout_fd);
1485       $fd_job{$stdout_fd} = $jobstepidx;
1486     }
1487   }
1488   # select on all reader fds with 0.1s timeout
1489   my @ready_fds = $sel->can_read(0.1);
1490   foreach my $fd (@ready_fds)
1491   {
1492     my $buf;
1493     if (0 < sysread ($fd, $buf, 65536))
1494     {
1495       $gotsome = 1;
1496       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1497
1498       my $jobstepidx = $fd_job{$fd};
1499       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1500         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1501         next;
1502       }
1503
1504       $jobstep[$jobstepidx]->{stderr_at} = time;
1505       $jobstep[$jobstepidx]->{stderr} .= $buf;
1506
1507       # Consume everything up to the last \n
1508       preprocess_stderr ($jobstepidx);
1509
1510       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1511       {
1512         # If we get a lot of stderr without a newline, chop off the
1513         # front to avoid letting our buffer grow indefinitely.
1514         substr ($jobstep[$jobstepidx]->{stderr},
1515                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1516       }
1517     }
1518   }
1519   return $gotsome;
1520 }
1521
1522
1523 # Consume all full lines of stderr for a jobstep. Everything after the
1524 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1525 # returning.
1526 sub preprocess_stderr
1527 {
1528   my $jobstepidx = shift;
1529   # slotindex is only defined for children running Arvados job tasks.
1530   # Be prepared to handle the undef case (for setup srun calls, etc.).
1531   my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1532
1533   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1534     my $line = $1;
1535     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1536     Log ($jobstepidx, "stderr $line");
1537     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1538       # If the allocation is revoked, we can't possibly continue, so mark all
1539       # nodes as failed.  This will cause the overall exit code to be
1540       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1541       # this job.
1542       $main::please_freeze = 1;
1543       foreach my $st (@slot) {
1544         $st->{node}->{fail_count}++;
1545       }
1546     }
1547     elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
1548       $jobstep[$jobstepidx]->{tempfail} = 1;
1549       if (defined($job_slot_index)) {
1550         $slot[$job_slot_index]->{node}->{fail_count}++;
1551         ban_node_by_slot($job_slot_index);
1552       }
1553     }
1554     elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1555       $jobstep[$jobstepidx]->{tempfail} = 1;
1556       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1557     }
1558     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1559       $jobstep[$jobstepidx]->{tempfail} = 1;
1560     }
1561   }
1562 }
1563
1564
1565 sub process_stderr_final
1566 {
1567   my $jobstepidx = shift;
1568   preprocess_stderr ($jobstepidx);
1569
1570   map {
1571     Log ($jobstepidx, "stderr $_");
1572   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1573   $jobstep[$jobstepidx]->{stderr} = '';
1574 }
1575
1576 sub fetch_block
1577 {
1578   my $hash = shift;
1579   my $keep;
1580   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1581     Log(undef, "fetch_block run error from arv-get $hash: $!");
1582     return undef;
1583   }
1584   my $output_block = "";
1585   while (1) {
1586     my $buf;
1587     my $bytes = sysread($keep, $buf, 1024 * 1024);
1588     if (!defined $bytes) {
1589       Log(undef, "fetch_block read error from arv-get: $!");
1590       $output_block = undef;
1591       last;
1592     } elsif ($bytes == 0) {
1593       # sysread returns 0 at the end of the pipe.
1594       last;
1595     } else {
1596       # some bytes were read into buf.
1597       $output_block .= $buf;
1598     }
1599   }
1600   close $keep;
1601   if ($?) {
1602     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1603     $output_block = undef;
1604   }
1605   return $output_block;
1606 }
1607
1608 # Create a collection by concatenating the output of all tasks (each
1609 # task's output is either a manifest fragment, a locator for a
1610 # manifest fragment stored in Keep, or nothing at all). Return the
1611 # portable_data_hash of the new collection.
1612 sub create_output_collection
1613 {
1614   Log (undef, "collate");
1615
1616   my ($child_out, $child_in);
1617   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1618 import arvados
1619 import sys
1620 print (arvados.api("v1").collections().
1621        create(body={"manifest_text": sys.stdin.read(),
1622                     "owner_uuid": sys.argv[2]}).
1623        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1624 }, retry_count(), $Job->{owner_uuid});
1625
1626   my $task_idx = -1;
1627   my $manifest_size = 0;
1628   for (@jobstep)
1629   {
1630     ++$task_idx;
1631     my $output = $_->{'arvados_task'}->{output};
1632     next if (!defined($output));
1633     my $next_write;
1634     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1635       $next_write = fetch_block($output);
1636     } else {
1637       $next_write = $output;
1638     }
1639     if (defined($next_write)) {
1640       if (!defined(syswrite($child_in, $next_write))) {
1641         # There's been an error writing.  Stop the loop.
1642         # We'll log details about the exit code later.
1643         last;
1644       } else {
1645         $manifest_size += length($next_write);
1646       }
1647     } else {
1648       my $uuid = $_->{'arvados_task'}->{'uuid'};
1649       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1650       $main::success = 0;
1651     }
1652   }
1653   close($child_in);
1654   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1655
1656   my $joboutput;
1657   my $s = IO::Select->new($child_out);
1658   if ($s->can_read(120)) {
1659     sysread($child_out, $joboutput, 1024 * 1024);
1660     waitpid($pid, 0);
1661     if ($?) {
1662       Log(undef, "output collection creation exited " . exit_status_s($?));
1663       $joboutput = undef;
1664     } else {
1665       chomp($joboutput);
1666     }
1667   } else {
1668     Log (undef, "timed out while creating output collection");
1669     foreach my $signal (2, 2, 2, 15, 15, 9) {
1670       kill($signal, $pid);
1671       last if waitpid($pid, WNOHANG) == -1;
1672       sleep(1);
1673     }
1674   }
1675   close($child_out);
1676
1677   return $joboutput;
1678 }
1679
1680 # Calls create_output_collection, logs the result, and returns it.
1681 # If that was successful, save that as the output in the job record.
1682 sub save_output_collection {
1683   my $collated_output = create_output_collection();
1684
1685   if (!$collated_output) {
1686     Log(undef, "Failed to write output collection");
1687   }
1688   else {
1689     Log(undef, "job output $collated_output");
1690     $Job->update_attributes('output' => $collated_output);
1691   }
1692   return $collated_output;
1693 }
1694
1695 sub killem
1696 {
1697   foreach (@_)
1698   {
1699     my $sig = 2;                # SIGINT first
1700     if (exists $proc{$_}->{"sent_$sig"} &&
1701         time - $proc{$_}->{"sent_$sig"} > 4)
1702     {
1703       $sig = 15;                # SIGTERM if SIGINT doesn't work
1704     }
1705     if (exists $proc{$_}->{"sent_$sig"} &&
1706         time - $proc{$_}->{"sent_$sig"} > 4)
1707     {
1708       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1709     }
1710     if (!exists $proc{$_}->{"sent_$sig"})
1711     {
1712       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1713       kill $sig, $_;
1714       select (undef, undef, undef, 0.1);
1715       if ($sig == 2)
1716       {
1717         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1718       }
1719       $proc{$_}->{"sent_$sig"} = time;
1720       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1721     }
1722   }
1723 }
1724
1725
1726 sub fhbits
1727 {
1728   my($bits);
1729   for (@_) {
1730     vec($bits,fileno($_),1) = 1;
1731   }
1732   $bits;
1733 }
1734
1735
1736 # Send log output to Keep via arv-put.
1737 #
1738 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1739 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1740 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1741 # $log_pipe_pid is the pid of the arv-put subprocess.
1742 #
1743 # The only functions that should access these variables directly are:
1744 #
1745 # log_writer_start($logfilename)
1746 #     Starts an arv-put pipe, reading data on stdin and writing it to
1747 #     a $logfilename file in an output collection.
1748 #
1749 # log_writer_read_output([$timeout])
1750 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1751 #     Passes $timeout to the select() call, with a default of 0.01.
1752 #     Returns the result of the last read() call on $log_pipe_out, or
1753 #     -1 if read() wasn't called because select() timed out.
1754 #     Only other log_writer_* functions should need to call this.
1755 #
1756 # log_writer_send($txt)
1757 #     Writes $txt to the output log collection.
1758 #
1759 # log_writer_finish()
1760 #     Closes the arv-put pipe and returns the output that it produces.
1761 #
1762 # log_writer_is_active()
1763 #     Returns a true value if there is currently a live arv-put
1764 #     process, false otherwise.
1765 #
1766 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1767     $log_pipe_pid);
1768
1769 sub log_writer_start($)
1770 {
1771   my $logfilename = shift;
1772   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1773                         'arv-put',
1774                         '--stream',
1775                         '--retries', '6',
1776                         '--filename', $logfilename,
1777                         '-');
1778   $log_pipe_out_buf = "";
1779   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1780 }
1781
1782 sub log_writer_read_output {
1783   my $timeout = shift || 0.01;
1784   my $read = -1;
1785   while ($read && $log_pipe_out_select->can_read($timeout)) {
1786     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1787                  length($log_pipe_out_buf));
1788   }
1789   if (!defined($read)) {
1790     Log(undef, "error reading log manifest from arv-put: $!");
1791   }
1792   return $read;
1793 }
1794
1795 sub log_writer_send($)
1796 {
1797   my $txt = shift;
1798   print $log_pipe_in $txt;
1799   log_writer_read_output();
1800 }
1801
1802 sub log_writer_finish()
1803 {
1804   return unless $log_pipe_pid;
1805
1806   close($log_pipe_in);
1807
1808   my $logger_failed = 0;
1809   my $read_result = log_writer_read_output(600);
1810   if ($read_result == -1) {
1811     $logger_failed = -1;
1812     Log (undef, "timed out reading from 'arv-put'");
1813   } elsif ($read_result != 0) {
1814     $logger_failed = -2;
1815     Log(undef, "failed to read arv-put log manifest to EOF");
1816   }
1817
1818   waitpid($log_pipe_pid, 0);
1819   if ($?) {
1820     $logger_failed ||= $?;
1821     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1822   }
1823
1824   close($log_pipe_out);
1825   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1826   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1827       $log_pipe_out_select = undef;
1828
1829   return $arv_put_output;
1830 }
1831
1832 sub log_writer_is_active() {
1833   return $log_pipe_pid;
1834 }
1835
1836 sub Log                         # ($jobstepidx, $logmessage)
1837 {
1838   my ($jobstepidx, $logmessage) = @_;
1839   if ($logmessage =~ /\n/) {
1840     for my $line (split (/\n/, $_[1])) {
1841       Log ($jobstepidx, $line);
1842     }
1843     return;
1844   }
1845   my $fh = select STDERR; $|=1; select $fh;
1846   my $task_qseq = '';
1847   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1848     $task_qseq = $jobstepidx;
1849   }
1850   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1851   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1852   $message .= "\n";
1853   my $datetime;
1854   if (log_writer_is_active() || -t STDERR) {
1855     my @gmtime = gmtime;
1856     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1857                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1858   }
1859   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1860
1861   if (log_writer_is_active()) {
1862     log_writer_send($datetime . " " . $message);
1863   }
1864 }
1865
1866
1867 sub croak
1868 {
1869   my ($package, $file, $line) = caller;
1870   my $message = "@_ at $file line $line\n";
1871   Log (undef, $message);
1872   release_allocation();
1873   freeze() if @jobstep_todo;
1874   create_output_collection() if @jobstep_todo;
1875   cleanup();
1876   save_meta();
1877   die;
1878 }
1879
1880
1881 sub cleanup
1882 {
1883   return unless $Job;
1884   if ($Job->{'state'} eq 'Cancelled') {
1885     $Job->update_attributes('finished_at' => scalar gmtime);
1886   } else {
1887     $Job->update_attributes('state' => 'Failed');
1888   }
1889 }
1890
1891
1892 sub save_meta
1893 {
1894   my $justcheckpoint = shift; # false if this will be the last meta saved
1895   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1896   return unless log_writer_is_active();
1897   my $log_manifest = log_writer_finish();
1898   return unless defined($log_manifest);
1899
1900   if ($Job->{log}) {
1901     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1902     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1903   }
1904
1905   my $log_coll = api_call(
1906     "collections/create", ensure_unique_name => 1, collection => {
1907       manifest_text => $log_manifest,
1908       owner_uuid => $Job->{owner_uuid},
1909       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1910     });
1911   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1912   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1913
1914   return $log_coll->{portable_data_hash};
1915 }
1916
1917
1918 sub freeze_if_want_freeze
1919 {
1920   if ($main::please_freeze)
1921   {
1922     release_allocation();
1923     if (@_)
1924     {
1925       # kill some srun procs before freeze+stop
1926       map { $proc{$_} = {} } @_;
1927       while (%proc)
1928       {
1929         killem (keys %proc);
1930         select (undef, undef, undef, 0.1);
1931         my $died;
1932         while (($died = waitpid (-1, WNOHANG)) > 0)
1933         {
1934           delete $proc{$died};
1935         }
1936       }
1937     }
1938     freeze();
1939     create_output_collection();
1940     cleanup();
1941     save_meta();
1942     exit 1;
1943   }
1944 }
1945
1946
1947 sub freeze
1948 {
1949   Log (undef, "Freeze not implemented");
1950   return;
1951 }
1952
1953
1954 sub thaw
1955 {
1956   croak ("Thaw not implemented");
1957 }
1958
1959
1960 sub freezequote
1961 {
1962   my $s = shift;
1963   $s =~ s/\\/\\\\/g;
1964   $s =~ s/\n/\\n/g;
1965   return $s;
1966 }
1967
1968
1969 sub freezeunquote
1970 {
1971   my $s = shift;
1972   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1973   return $s;
1974 }
1975
1976 sub srun_sync
1977 {
1978   my $srunargs = shift;
1979   my $execargs = shift;
1980   my $opts = shift || {};
1981   my $stdin = shift;
1982
1983   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1984   Log (undef, "$label: start");
1985
1986   my ($stderr_r, $stderr_w);
1987   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1988
1989   my ($stdout_r, $stdout_w);
1990   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1991
1992   my $srunpid = fork();
1993   if ($srunpid == 0)
1994   {
1995     close($stderr_r);
1996     close($stdout_r);
1997     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1998     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1999     open(STDERR, ">&", $stderr_w) or croak ($!);
2000     open(STDOUT, ">&", $stdout_w) or croak ($!);
2001     srun ($srunargs, $execargs, $opts, $stdin);
2002     exit (1);
2003   }
2004   close($stderr_w);
2005   close($stdout_w);
2006
2007   set_nonblocking($stderr_r);
2008   set_nonblocking($stdout_r);
2009
2010   # Add entries to @jobstep and %proc so check_squeue() and
2011   # freeze_if_want_freeze() can treat it like a job task process.
2012   push @jobstep, {
2013     stderr => '',
2014     stderr_at => 0,
2015     stderr_captured => '',
2016     stdout_r => $stdout_r,
2017     stdout_captured => '',
2018   };
2019   my $jobstepidx = $#jobstep;
2020   $proc{$srunpid} = {
2021     jobstepidx => $jobstepidx,
2022   };
2023   $reader{$jobstepidx} = $stderr_r;
2024
2025   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2026     my $busy = readfrompipes();
2027     if (!$busy || ($latest_refresh + 2 < scalar time)) {
2028       check_refresh_wanted();
2029       check_squeue();
2030       check_sinfo();
2031     }
2032     if (!$busy) {
2033       select(undef, undef, undef, 0.1);
2034     }
2035     killem(keys %proc) if $main::please_freeze;
2036   }
2037   my $exited = $?;
2038
2039   1 while readfrompipes();
2040   process_stderr_final ($jobstepidx);
2041
2042   Log (undef, "$label: exit ".exit_status_s($exited));
2043
2044   close($stdout_r);
2045   close($stderr_r);
2046   delete $proc{$srunpid};
2047   delete $reader{$jobstepidx};
2048
2049   my $j = pop @jobstep;
2050   # If the srun showed signs of tempfail, ensure the caller treats that as a
2051   # failure case.
2052   if ($main::please_freeze || $j->{tempfail}) {
2053     $exited ||= 255;
2054   }
2055   return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
2056 }
2057
2058
2059 sub srun
2060 {
2061   my $srunargs = shift;
2062   my $execargs = shift;
2063   my $opts = shift || {};
2064   my $stdin = shift;
2065   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2066
2067   $Data::Dumper::Terse = 1;
2068   $Data::Dumper::Indent = 0;
2069   my $show_cmd = Dumper($args);
2070   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2071   $show_cmd =~ s/\n/ /g;
2072   if ($opts->{fork}) {
2073     Log(undef, "starting: $show_cmd");
2074   } else {
2075     # This is a child process: parent is in charge of reading our
2076     # stderr and copying it to Log() if needed.
2077     warn "starting: $show_cmd\n";
2078   }
2079
2080   if (defined $stdin) {
2081     my $child = open STDIN, "-|";
2082     defined $child or die "no fork: $!";
2083     if ($child == 0) {
2084       print $stdin or die $!;
2085       close STDOUT or die $!;
2086       exit 0;
2087     }
2088   }
2089
2090   return system (@$args) if $opts->{fork};
2091
2092   exec @$args;
2093   warn "ENV size is ".length(join(" ",%ENV));
2094   die "exec failed: $!: @$args";
2095 }
2096
2097
2098 sub ban_node_by_slot {
2099   # Don't start any new jobsteps on this node for 60 seconds
2100   my $slotid = shift;
2101   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2102   $slot[$slotid]->{node}->{hold_count}++;
2103   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2104 }
2105
2106 sub must_lock_now
2107 {
2108   my ($lockfile, $error_message) = @_;
2109   open L, ">", $lockfile or croak("$lockfile: $!");
2110   if (!flock L, LOCK_EX|LOCK_NB) {
2111     croak("Can't lock $lockfile: $error_message\n");
2112   }
2113 }
2114
2115 sub find_docker_image {
2116   # Given a Keep locator, check to see if it contains a Docker image.
2117   # If so, return its stream name and Docker hash.
2118   # If not, return undef for both values.
2119   my $locator = shift;
2120   my ($streamname, $filename);
2121   my $image = api_call("collections/get", uuid => $locator);
2122   if ($image) {
2123     foreach my $line (split(/\n/, $image->{manifest_text})) {
2124       my @tokens = split(/\s+/, $line);
2125       next if (!@tokens);
2126       $streamname = shift(@tokens);
2127       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2128         if (defined($filename)) {
2129           return (undef, undef);  # More than one file in the Collection.
2130         } else {
2131           $filename = (split(/:/, $filedata, 3))[2];
2132         }
2133       }
2134     }
2135   }
2136   if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2137     return ($streamname, $1);
2138   } else {
2139     return (undef, undef);
2140   }
2141 }
2142
2143 sub exit_retry_unlocked {
2144   Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
2145   exit(EX_RETRY_UNLOCKED);
2146 }
2147
2148 sub retry_count {
2149   # Calculate the number of times an operation should be retried,
2150   # assuming exponential backoff, and that we're willing to retry as
2151   # long as tasks have been running.  Enforce a minimum of 3 retries.
2152   my ($starttime, $endtime, $timediff, $retries);
2153   if (@jobstep) {
2154     $starttime = $jobstep[0]->{starttime};
2155     $endtime = $jobstep[-1]->{finishtime};
2156   }
2157   if (!defined($starttime)) {
2158     $timediff = 0;
2159   } elsif (!defined($endtime)) {
2160     $timediff = time - $starttime;
2161   } else {
2162     $timediff = ($endtime - $starttime) - (time - $endtime);
2163   }
2164   if ($timediff > 0) {
2165     $retries = int(log($timediff) / log(2));
2166   } else {
2167     $retries = 1;  # Use the minimum.
2168   }
2169   return ($retries > 3) ? $retries : 3;
2170 }
2171
2172 sub retry_op {
2173   # Pass in two function references.
2174   # This method will be called with the remaining arguments.
2175   # If it dies, retry it with exponential backoff until it succeeds,
2176   # or until the current retry_count is exhausted.  After each failure
2177   # that can be retried, the second function will be called with
2178   # the current try count (0-based), next try time, and error message.
2179   my $operation = shift;
2180   my $op_text = shift;
2181   my $retries = retry_count();
2182   my $retry_callback = sub {
2183     my ($try_count, $next_try_at, $errmsg) = @_;
2184     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2185     $errmsg =~ s/\s/ /g;
2186     $errmsg =~ s/\s+$//;
2187     my $retry_msg;
2188     if ($next_try_at < time) {
2189       $retry_msg = "Retrying.";
2190     } else {
2191       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2192       $retry_msg = "Retrying at $next_try_fmt.";
2193     }
2194     Log(undef, "$op_text failed: $errmsg. $retry_msg");
2195   };
2196   foreach my $try_count (0..$retries) {
2197     my $next_try = time + (2 ** $try_count);
2198     my $result = eval { $operation->(@_); };
2199     if (!$@) {
2200       return $result;
2201     } elsif ($try_count < $retries) {
2202       $retry_callback->($try_count, $next_try, $@);
2203       my $sleep_time = $next_try - time;
2204       sleep($sleep_time) if ($sleep_time > 0);
2205     }
2206   }
2207   # Ensure the error message ends in a newline, so Perl doesn't add
2208   # retry_op's line number to it.
2209   chomp($@);
2210   die($@ . "\n");
2211 }
2212
2213 sub api_call {
2214   # Pass in a /-separated API method name, and arguments for it.
2215   # This function will call that method, retrying as needed until
2216   # the current retry_count is exhausted, with a log on the first failure.
2217   my $method_name = shift;
2218   my $method = $arv;
2219   foreach my $key (split(/\//, $method_name)) {
2220     $method = $method->{$key};
2221   }
2222   return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
2223 }
2224
2225 sub exit_status_s {
2226   # Given a $?, return a human-readable exit code string like "0" or
2227   # "1" or "0 with signal 1" or "1 with signal 11".
2228   my $exitcode = shift;
2229   my $s = $exitcode >> 8;
2230   if ($exitcode & 0x7f) {
2231     $s .= " with signal " . ($exitcode & 0x7f);
2232   }
2233   if ($exitcode & 0x80) {
2234     $s .= " with core dump";
2235   }
2236   return $s;
2237 }
2238
2239 sub handle_readall {
2240   # Pass in a glob reference to a file handle.
2241   # Read all its contents and return them as a string.
2242   my $fh_glob_ref = shift;
2243   local $/ = undef;
2244   return <$fh_glob_ref>;
2245 }
2246
2247 sub tar_filename_n {
2248   my $n = shift;
2249   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2250 }
2251
2252 sub add_git_archive {
2253   # Pass in a git archive command as a string or list, a la system().
2254   # This method will save its output to be included in the archive sent to the
2255   # build script.
2256   my $git_input;
2257   $git_tar_count++;
2258   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2259     croak("Failed to save git archive: $!");
2260   }
2261   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2262   close($git_input);
2263   waitpid($git_pid, 0);
2264   close(GIT_ARCHIVE);
2265   if ($?) {
2266     croak("Failed to save git archive: git exited " . exit_status_s($?));
2267   }
2268 }
2269
2270 sub combined_git_archive {
2271   # Combine all saved tar archives into a single archive, then return its
2272   # contents in a string.  Return undef if no archives have been saved.
2273   if ($git_tar_count < 1) {
2274     return undef;
2275   }
2276   my $base_tar_name = tar_filename_n(1);
2277   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2278     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2279     if ($tar_exit != 0) {
2280       croak("Error preparing build archive: tar -A exited " .
2281             exit_status_s($tar_exit));
2282     }
2283   }
2284   if (!open(GIT_TAR, "<", $base_tar_name)) {
2285     croak("Could not open build archive: $!");
2286   }
2287   my $tar_contents = handle_readall(\*GIT_TAR);
2288   close(GIT_TAR);
2289   return $tar_contents;
2290 }
2291
2292 sub set_nonblocking {
2293   my $fh = shift;
2294   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2295   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2296 }
2297
2298 __DATA__
2299 #!/usr/bin/env perl
2300 #
2301 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2302 # server invokes this script on individual compute nodes, or localhost if we're
2303 # running a job locally.  It gets called in two modes:
2304 #
2305 # * No arguments: Installation mode.  Read a tar archive from the DATA
2306 #   file handle; it includes the Crunch script's source code, and
2307 #   maybe SDKs as well.  Those should be installed in the proper
2308 #   locations.  This runs outside of any Docker container, so don't try to
2309 #   introspect Crunch's runtime environment.
2310 #
2311 # * With arguments: Crunch script run mode.  This script should set up the
2312 #   environment, then run the command specified in the arguments.  This runs
2313 #   inside any Docker container.
2314
2315 use Fcntl ':flock';
2316 use File::Path qw( make_path remove_tree );
2317 use POSIX qw(getcwd);
2318
2319 use constant TASK_TEMPFAIL => 111;
2320
2321 # Map SDK subdirectories to the path environments they belong to.
2322 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2323
2324 my $destdir = $ENV{"CRUNCH_SRC"};
2325 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2326 my $repo = $ENV{"CRUNCH_SRC_URL"};
2327 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2328 my $job_work = $ENV{"JOB_WORK"};
2329 my $task_work = $ENV{"TASK_WORK"};
2330
2331 open(STDOUT_ORIG, ">&", STDOUT);
2332 open(STDERR_ORIG, ">&", STDERR);
2333
2334 for my $dir ($destdir, $job_work, $task_work) {
2335   if ($dir) {
2336     make_path $dir;
2337     -e $dir or die "Failed to create temporary directory ($dir): $!";
2338   }
2339 }
2340
2341 if ($task_work) {
2342   remove_tree($task_work, {keep_root => 1});
2343 }
2344
2345 ### Crunch script run mode
2346 if (@ARGV) {
2347   # We want to do routine logging during task 0 only.  This gives the user
2348   # the information they need, but avoids repeating the information for every
2349   # task.
2350   my $Log;
2351   if ($ENV{TASK_SEQUENCE} eq "0") {
2352     $Log = sub {
2353       my $msg = shift;
2354       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2355     };
2356   } else {
2357     $Log = sub { };
2358   }
2359
2360   my $python_src = "$install_dir/python";
2361   my $venv_dir = "$job_work/.arvados.venv";
2362   my $venv_built = -e "$venv_dir/bin/activate";
2363   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2364     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2365                  "--python=python2.7", $venv_dir);
2366     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2367     $venv_built = 1;
2368     $Log->("Built Python SDK virtualenv");
2369   }
2370
2371   my @pysdk_version_cmd = ("python", "-c",
2372     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2373   if ($venv_built) {
2374     $Log->("Running in Python SDK virtualenv");
2375     @pysdk_version_cmd = ();
2376     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2377     @ARGV = ("/bin/sh", "-ec",
2378              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2379   } elsif (-d $python_src) {
2380     $Log->("Warning: virtualenv not found inside Docker container default " .
2381            "\$PATH. Can't install Python SDK.");
2382   }
2383
2384   if (@pysdk_version_cmd) {
2385     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2386     my $pysdk_version = <$pysdk_version_pipe>;
2387     close($pysdk_version_pipe);
2388     if ($? == 0) {
2389       chomp($pysdk_version);
2390       $Log->("Using Arvados SDK version $pysdk_version");
2391     } else {
2392       # A lot could've gone wrong here, but pretty much all of it means that
2393       # Python won't be able to load the Arvados SDK.
2394       $Log->("Warning: Arvados SDK not found");
2395     }
2396   }
2397
2398   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2399     my $sdk_path = "$install_dir/$sdk_dir";
2400     if (-d $sdk_path) {
2401       if ($ENV{$sdk_envkey}) {
2402         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2403       } else {
2404         $ENV{$sdk_envkey} = $sdk_path;
2405       }
2406       $Log->("Arvados SDK added to %s", $sdk_envkey);
2407     }
2408   }
2409
2410   exec(@ARGV);
2411   die "Cannot exec `@ARGV`: $!";
2412 }
2413
2414 ### Installation mode
2415 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2416 flock L, LOCK_EX;
2417 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2418   # This exact git archive (source + arvados sdk) is already installed
2419   # here, so there's no need to reinstall it.
2420
2421   # We must consume our DATA section, though: otherwise the process
2422   # feeding it to us will get SIGPIPE.
2423   my $buf;
2424   while (read(DATA, $buf, 65536)) { }
2425
2426   exit(0);
2427 }
2428
2429 unlink "$destdir.archive_hash";
2430 mkdir $destdir;
2431
2432 do {
2433   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2434   local $SIG{PIPE} = "IGNORE";
2435   warn "Extracting archive: $archive_hash\n";
2436   # --ignore-zeros is necessary sometimes: depending on how much NUL
2437   # padding tar -A put on our combined archive (which in turn depends
2438   # on the length of the component archives) tar without
2439   # --ignore-zeros will exit before consuming stdin and cause close()
2440   # to fail on the resulting SIGPIPE.
2441   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2442     die "Error launching 'tar -xC $destdir': $!";
2443   }
2444   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2445   # get SIGPIPE.  We must feed it data incrementally.
2446   my $tar_input;
2447   while (read(DATA, $tar_input, 65536)) {
2448     print TARX $tar_input;
2449   }
2450   if(!close(TARX)) {
2451     die "'tar -xC $destdir' exited $?: $!";
2452   }
2453 };
2454
2455 mkdir $install_dir;
2456
2457 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2458 if (-d $sdk_root) {
2459   foreach my $sdk_lang (("python",
2460                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2461     if (-d "$sdk_root/$sdk_lang") {
2462       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2463         die "Failed to install $sdk_lang SDK: $!";
2464       }
2465     }
2466   }
2467 }
2468
2469 my $python_dir = "$install_dir/python";
2470 if ((-d $python_dir) and can_run("python2.7")) {
2471   open(my $egg_info_pipe, "-|",
2472        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2473   my @egg_info_errors = <$egg_info_pipe>;
2474   close($egg_info_pipe);
2475
2476   if ($?) {
2477     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2478       # egg_info apparently failed because it couldn't ask git for a build tag.
2479       # Specify no build tag.
2480       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2481       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2482       close($pysdk_cfg);
2483     } else {
2484       my $egg_info_exit = $? >> 8;
2485       foreach my $errline (@egg_info_errors) {
2486         warn $errline;
2487       }
2488       warn "python setup.py egg_info failed: exit $egg_info_exit";
2489       exit ($egg_info_exit || 1);
2490     }
2491   }
2492 }
2493
2494 # Hide messages from the install script (unless it fails: shell_or_die
2495 # will show $destdir.log in that case).
2496 open(STDOUT, ">>", "$destdir.log") or die ($!);
2497 open(STDERR, ">&", STDOUT) or die ($!);
2498
2499 if (-e "$destdir/crunch_scripts/install") {
2500     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2501 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2502     # Old version
2503     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2504 } elsif (-e "./install.sh") {
2505     shell_or_die (undef, "./install.sh", $install_dir);
2506 }
2507
2508 if ($archive_hash) {
2509     unlink "$destdir.archive_hash.new";
2510     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2511     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2512 }
2513
2514 close L;
2515
2516 sub can_run {
2517   my $command_name = shift;
2518   open(my $which, "-|", "which", $command_name) or die ($!);
2519   while (<$which>) { }
2520   close($which);
2521   return ($? == 0);
2522 }
2523
2524 sub shell_or_die
2525 {
2526   my $exitcode = shift;
2527
2528   if ($ENV{"DEBUG"}) {
2529     print STDERR "@_\n";
2530   }
2531   if (system (@_) != 0) {
2532     my $err = $!;
2533     my $code = $?;
2534     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2535     open STDERR, ">&STDERR_ORIG";
2536     system ("cat $destdir.log >&2");
2537     warn "@_ failed ($err): $exitstatus";
2538     if (defined($exitcode)) {
2539       exit $exitcode;
2540     }
2541     else {
2542       exit (($code >> 8) || 1);
2543     }
2544   }
2545 }
2546
2547 __DATA__