Merge branch '12550-crunch1-exit-race'
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: AGPL-3.0
6
7 =head1 NAME
8
9 crunch-job: Execute job steps, save snapshots as requested, collate output.
10
11 =head1 SYNOPSIS
12
13 Obtain job details from Arvados, run tasks on compute nodes (typically
14 invoked by scheduler on controller):
15
16  crunch-job --job x-y-z --git-dir /path/to/repo/.git
17
18 Obtain job details from command line, run tasks on local machine
19 (typically invoked by application or developer on VM):
20
21  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
22
23  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
24
25 =head1 OPTIONS
26
27 =over
28
29 =item --force-unlock
30
31 If the job is already locked, steal the lock and run it anyway.
32
33 =item --git-dir
34
35 Path to a .git directory (or a git URL) where the commit given in the
36 job's C<script_version> attribute is to be found. If this is I<not>
37 given, the job's C<repository> attribute will be used.
38
39 =item --job-api-token
40
41 Arvados API authorization token to use during the course of the job.
42
43 =item --no-clear-tmp
44
45 Do not clear per-job/task temporary directories during initial job
46 setup. This can speed up development and debugging when running jobs
47 locally.
48
49 =item --job
50
51 UUID of the job to run, or a JSON-encoded job resource without a
52 UUID. If the latter is given, a new job object will be created.
53
54 =back
55
56 =head1 RUNNING JOBS LOCALLY
57
58 crunch-job's log messages appear on stderr along with the job tasks'
59 stderr streams. The log is saved in Keep at each checkpoint and when
60 the job finishes.
61
62 If the job succeeds, the job's output locator is printed on stdout.
63
64 While the job is running, the following signals are accepted:
65
66 =over
67
68 =item control-C, SIGINT, SIGQUIT
69
70 Save a checkpoint, terminate any job tasks that are running, and stop.
71
72 =item SIGALRM
73
74 Save a checkpoint and continue.
75
76 =item SIGHUP
77
78 Refresh node allocation (i.e., check whether any nodes have been added
79 or unallocated) and attributes of the Job record that should affect
80 behavior (e.g., cancel job if cancelled_at becomes non-nil).
81
82 =back
83
84 =cut
85
86
87 use strict;
88 use POSIX ':sys_wait_h';
89 use POSIX qw(strftime);
90 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Arvados;
92 use Cwd qw(realpath);
93 use Data::Dumper;
94 use Digest::MD5 qw(md5_hex);
95 use Getopt::Long;
96 use IPC::Open2;
97 use IO::Select;
98 use File::Temp;
99 use Fcntl ':flock';
100 use File::Path qw( make_path remove_tree );
101
102 use constant TASK_TEMPFAIL => 111;
103 use constant EX_TEMPFAIL => 75;
104 use constant EX_RETRY_UNLOCKED => 93;
105
106 $ENV{"TMPDIR"} ||= "/tmp";
107 unless (defined $ENV{"CRUNCH_TMP"}) {
108   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
109   if ($ENV{"USER"} ne "crunch" && $< != 0) {
110     # use a tmp dir unique for my uid
111     $ENV{"CRUNCH_TMP"} .= "-$<";
112   }
113 }
114
115 # Create the tmp directory if it does not exist
116 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
117   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
118 }
119
120 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
121 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
122 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
123 mkdir ($ENV{"JOB_WORK"});
124
125 my %proc;
126 my $force_unlock;
127 my $git_dir;
128 my $jobspec;
129 my $job_api_token;
130 my $no_clear_tmp;
131 my $resume_stash;
132 my $cgroup_root = "/sys/fs/cgroup";
133 my $docker_bin = "docker.io";
134 my $docker_run_args = "";
135 GetOptions('force-unlock' => \$force_unlock,
136            'git-dir=s' => \$git_dir,
137            'job=s' => \$jobspec,
138            'job-api-token=s' => \$job_api_token,
139            'no-clear-tmp' => \$no_clear_tmp,
140            'resume-stash=s' => \$resume_stash,
141            'cgroup-root=s' => \$cgroup_root,
142            'docker-bin=s' => \$docker_bin,
143            'docker-run-args=s' => \$docker_run_args,
144     );
145
146 if (defined $job_api_token) {
147   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
148 }
149
150 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
151
152
153 $SIG{'USR1'} = sub
154 {
155   $main::ENV{CRUNCH_DEBUG} = 1;
156 };
157 $SIG{'USR2'} = sub
158 {
159   $main::ENV{CRUNCH_DEBUG} = 0;
160 };
161
162 my $arv = Arvados->new('apiVersion' => 'v1');
163
164 my $Job;
165 my $job_id;
166 my $dbh;
167 my $sth;
168 my @jobstep;
169
170 my $local_job;
171 if ($jobspec =~ /^[-a-z\d]+$/)
172 {
173   # $jobspec is an Arvados UUID, not a JSON job specification
174   $Job = api_call("jobs/get", uuid => $jobspec);
175   $local_job = 0;
176 }
177 else
178 {
179   $local_job = JSON::decode_json($jobspec);
180 }
181
182
183 # Make sure our workers (our slurm nodes, localhost, or whatever) are
184 # at least able to run basic commands: they aren't down or severely
185 # misconfigured.
186 my $cmd = ['true'];
187 if (($Job || $local_job)->{docker_image_locator}) {
188   $cmd = [$docker_bin, 'ps', '-q'];
189 }
190 Log(undef, "Sanity check is `@$cmd`");
191 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
192   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
193   $cmd,
194   {label => "sanity check"});
195 if ($exited != 0) {
196   Log(undef, "Sanity check failed: ".exit_status_s($exited));
197   exit EX_TEMPFAIL;
198 }
199 Log(undef, "Sanity check OK");
200
201
202 my $User = api_call("users/current");
203
204 if (!$local_job) {
205   if (!$force_unlock) {
206     # Claim this job, and make sure nobody else does
207     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
208     if ($@) {
209       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
210       exit EX_TEMPFAIL;
211     };
212   }
213 }
214 else
215 {
216   if (!$resume_stash)
217   {
218     map { croak ("No $_ specified") unless $local_job->{$_} }
219     qw(script script_version script_parameters);
220   }
221
222   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
223   $local_job->{'started_at'} = gmtime;
224   $local_job->{'state'} = 'Running';
225
226   $Job = api_call("jobs/create", job => $local_job);
227 }
228 $job_id = $Job->{'uuid'};
229
230 my $keep_logfile = $job_id . '.log.txt';
231 log_writer_start($keep_logfile);
232
233 $Job->{'runtime_constraints'} ||= {};
234 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
235 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
236
237 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
238 if ($? == 0) {
239   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
240   chomp($gem_versions);
241   chop($gem_versions);  # Closing parentheses
242 } else {
243   $gem_versions = "";
244 }
245 Log(undef,
246     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
247
248 Log (undef, "check slurm allocation");
249 my @slot;
250 my @node;
251 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
252 my @sinfo;
253 if (!$have_slurm)
254 {
255   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
256   push @sinfo, "$localcpus localhost";
257 }
258 if (exists $ENV{SLURM_NODELIST})
259 {
260   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
261 }
262 foreach (@sinfo)
263 {
264   my ($ncpus, $slurm_nodelist) = split;
265   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
266
267   my @nodelist;
268   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
269   {
270     my $nodelist = $1;
271     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
272     {
273       my $ranges = $1;
274       foreach (split (",", $ranges))
275       {
276         my ($a, $b);
277         if (/(\d+)-(\d+)/)
278         {
279           $a = $1;
280           $b = $2;
281         }
282         else
283         {
284           $a = $_;
285           $b = $_;
286         }
287         push @nodelist, map {
288           my $n = $nodelist;
289           $n =~ s/\[[-,\d]+\]/$_/;
290           $n;
291         } ($a..$b);
292       }
293     }
294     else
295     {
296       push @nodelist, $nodelist;
297     }
298   }
299   foreach my $nodename (@nodelist)
300   {
301     Log (undef, "node $nodename - $ncpus slots");
302     my $node = { name => $nodename,
303                  ncpus => $ncpus,
304                  # The number of consecutive times a task has been dispatched
305                  # to this node and failed.
306                  losing_streak => 0,
307                  # The number of consecutive times that SLURM has reported
308                  # a node failure since the last successful task.
309                  fail_count => 0,
310                  # Don't dispatch work to this node until this time
311                  # (in seconds since the epoch) has passed.
312                  hold_until => 0 };
313     foreach my $cpu (1..$ncpus)
314     {
315       push @slot, { node => $node,
316                     cpu => $cpu };
317     }
318   }
319   push @node, @nodelist;
320 }
321
322
323
324 # Ensure that we get one jobstep running on each allocated node before
325 # we start overloading nodes with concurrent steps
326
327 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
328
329
330 $Job->update_attributes(
331   'tasks_summary' => { 'failed' => 0,
332                        'todo' => 1,
333                        'running' => 0,
334                        'done' => 0 });
335
336 Log (undef, "start");
337 $SIG{'INT'} = sub { $main::please_freeze = 1; };
338 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
339 $SIG{'TERM'} = \&croak;
340 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
341 $SIG{'ALRM'} = sub { $main::please_info = 1; };
342 $SIG{'CONT'} = sub { $main::please_continue = 1; };
343 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
344
345 $main::please_freeze = 0;
346 $main::please_info = 0;
347 $main::please_continue = 0;
348 $main::please_refresh = 0;
349 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
350
351 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
352 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
353 $ENV{"JOB_UUID"} = $job_id;
354
355
356 my @jobstep_todo = ();
357 my @jobstep_done = ();
358 my @jobstep_tomerge = ();
359 my $jobstep_tomerge_level = 0;
360 my $squeue_checked = 0;
361 my $sinfo_checked = 0;
362 my $latest_refresh = scalar time;
363
364
365
366 if (defined $Job->{thawedfromkey})
367 {
368   thaw ($Job->{thawedfromkey});
369 }
370 else
371 {
372   my $first_task = api_call("job_tasks/create", job_task => {
373     'job_uuid' => $Job->{'uuid'},
374     'sequence' => 0,
375     'qsequence' => 0,
376     'parameters' => {},
377   });
378   push @jobstep, { 'level' => 0,
379                    'failures' => 0,
380                    'arvados_task' => $first_task,
381                  };
382   push @jobstep_todo, 0;
383 }
384
385
386 if (!$have_slurm)
387 {
388   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
389 }
390
391 my $build_script = handle_readall(\*DATA);
392 my $nodelist = join(",", @node);
393 my $git_tar_count = 0;
394
395 if (!defined $no_clear_tmp) {
396   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
397   # up work directories crunch_tmp/work, crunch_tmp/opt,
398   # crunch_tmp/src*.
399   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
400     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
401     ['bash', '-ec', q{
402 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
403 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
404     }],
405     {label => "clean work dirs"});
406   if ($exited != 0) {
407     exit_retry_unlocked();
408   }
409 }
410
411 # If this job requires a Docker image, install that.
412 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
413 if ($docker_locator = $Job->{docker_image_locator}) {
414   Log (undef, "Install docker image $docker_locator");
415   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
416   if (!$docker_hash)
417   {
418     croak("No Docker image hash found from locator $docker_locator");
419   }
420   Log (undef, "docker image hash is $docker_hash");
421   $docker_stream =~ s/^\.//;
422   my $docker_install_script = qq{
423 loaded() {
424   id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
425   echo "image ID is \$id"
426   [[ \${id} = \Q$docker_hash\E ]]
427 }
428 if loaded >&2 2>/dev/null; then
429   echo >&2 "image is already present"
430   exit 0
431 fi
432 echo >&2 "docker image is not present; loading"
433 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
434 if ! loaded >&2; then
435   echo >&2 "`docker load` exited 0, but image is not found (!)"
436   exit 1
437 fi
438 echo >&2 "image loaded successfully"
439 };
440
441   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
442     ["srun", "--nodelist=" . join(',', @node)],
443     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
444     {label => "load docker image"});
445   if ($exited != 0)
446   {
447     exit_retry_unlocked();
448   }
449
450   # Determine whether this version of Docker supports memory+swap limits.
451   ($exited, $stdout, $stderr, $tempfail) = srun_sync(
452     ["srun", "--nodes=1"],
453     [$docker_bin, 'run', '--help'],
454     {label => "check --memory-swap feature"});
455   if ($tempfail) {
456     exit_retry_unlocked();
457   }
458   $docker_limitmem = ($stdout =~ /--memory-swap/);
459
460   # Find a non-root Docker user to use.
461   # Tries the default user for the container, then 'crunch', then 'nobody',
462   # testing for whether the actual user id is non-zero.  This defends against
463   # mistakes but not malice, but we intend to harden the security in the future
464   # so we don't want anyone getting used to their jobs running as root in their
465   # Docker containers.
466   my @tryusers = ("", "crunch", "nobody");
467   foreach my $try_user (@tryusers) {
468     my $label;
469     my $try_user_arg;
470     if ($try_user eq "") {
471       $label = "check whether default user is UID 0";
472       $try_user_arg = "";
473     } else {
474       $label = "check whether user '$try_user' is UID 0";
475       $try_user_arg = "--user=$try_user";
476     }
477     my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
478       ["srun", "--nodes=1"],
479       ["/bin/sh", "-ec",
480        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
481       {label => $label});
482     chomp($stdout);
483     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
484       $dockeruserarg = $try_user_arg;
485       if ($try_user eq "") {
486         Log(undef, "Container will run with default user");
487       } else {
488         Log(undef, "Container will run with $dockeruserarg");
489       }
490       last;
491     } elsif ($tempfail) {
492       exit_retry_unlocked();
493     }
494   }
495
496   if (!defined $dockeruserarg) {
497     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
498   }
499
500   if ($Job->{arvados_sdk_version}) {
501     # The job also specifies an Arvados SDK version.  Add the SDKs to the
502     # tar file for the build script to install.
503     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
504                        $Job->{arvados_sdk_version}));
505     add_git_archive("git", "--git-dir=$git_dir", "archive",
506                     "--prefix=.arvados.sdk/",
507                     $Job->{arvados_sdk_version}, "sdk");
508   }
509 }
510
511 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
512   # If script_version looks like an absolute path, *and* the --git-dir
513   # argument was not given -- which implies we were not invoked by
514   # crunch-dispatch -- we will use the given path as a working
515   # directory instead of resolving script_version to a git commit (or
516   # doing anything else with git).
517   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
518   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
519 }
520 else {
521   # Resolve the given script_version to a git commit sha1. Also, if
522   # the repository is remote, clone it into our local filesystem: this
523   # ensures "git archive" will work, and is necessary to reliably
524   # resolve a symbolic script_version like "master^".
525   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
526
527   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
528
529   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
530
531   # If we're running under crunch-dispatch, it will have already
532   # pulled the appropriate source tree into its own repository, and
533   # given us that repo's path as $git_dir.
534   #
535   # If we're running a "local" job, we might have to fetch content
536   # from a remote repository.
537   #
538   # (Currently crunch-dispatch gives a local path with --git-dir, but
539   # we might as well accept URLs there too in case it changes its
540   # mind.)
541   my $repo = $git_dir || $Job->{'repository'};
542
543   # Repository can be remote or local. If remote, we'll need to fetch it
544   # to a local dir before doing `git log` et al.
545   my $repo_location;
546
547   if ($repo =~ m{://|^[^/]*:}) {
548     # $repo is a git url we can clone, like git:// or https:// or
549     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
550     # not recognized here because distinguishing that from a local
551     # path is too fragile. If you really need something strange here,
552     # use the ssh:// form.
553     $repo_location = 'remote';
554   } elsif ($repo =~ m{^\.*/}) {
555     # $repo is a local path to a git index. We'll also resolve ../foo
556     # to ../foo/.git if the latter is a directory. To help
557     # disambiguate local paths from named hosted repositories, this
558     # form must be given as ./ or ../ if it's a relative path.
559     if (-d "$repo/.git") {
560       $repo = "$repo/.git";
561     }
562     $repo_location = 'local';
563   } else {
564     # $repo is none of the above. It must be the name of a hosted
565     # repository.
566     my $arv_repo_list = api_call("repositories/list",
567                                  'filters' => [['name','=',$repo]]);
568     my @repos_found = @{$arv_repo_list->{'items'}};
569     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
570     if ($n_found > 0) {
571       Log(undef, "Repository '$repo' -> "
572           . join(", ", map { $_->{'uuid'} } @repos_found));
573     }
574     if ($n_found != 1) {
575       croak("Error: Found $n_found repositories with name '$repo'.");
576     }
577     $repo = $repos_found[0]->{'fetch_url'};
578     $repo_location = 'remote';
579   }
580   Log(undef, "Using $repo_location repository '$repo'");
581   $ENV{"CRUNCH_SRC_URL"} = $repo;
582
583   # Resolve given script_version (we'll call that $treeish here) to a
584   # commit sha1 ($commit).
585   my $treeish = $Job->{'script_version'};
586   my $commit;
587   if ($repo_location eq 'remote') {
588     # We minimize excess object-fetching by re-using the same bare
589     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
590     # just keep adding remotes to it as needed.
591     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
592     my $gitcmd = "git --git-dir=\Q$local_repo\E";
593
594     # Set up our local repo for caching remote objects, making
595     # archives, etc.
596     if (!-d $local_repo) {
597       make_path($local_repo) or croak("Error: could not create $local_repo");
598     }
599     # This works (exits 0 and doesn't delete fetched objects) even
600     # if $local_repo is already initialized:
601     `$gitcmd init --bare`;
602     if ($?) {
603       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
604     }
605
606     # If $treeish looks like a hash (or abbrev hash) we look it up in
607     # our local cache first, since that's cheaper. (We don't want to
608     # do that with tags/branches though -- those change over time, so
609     # they should always be resolved by the remote repo.)
610     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
611       # Hide stderr because it's normal for this to fail:
612       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
613       if ($? == 0 &&
614           # Careful not to resolve a branch named abcdeff to commit 1234567:
615           $sha1 =~ /^$treeish/ &&
616           $sha1 =~ /^([0-9a-f]{40})$/s) {
617         $commit = $1;
618         Log(undef, "Commit $commit already present in $local_repo");
619       }
620     }
621
622     if (!defined $commit) {
623       # If $treeish isn't just a hash or abbrev hash, or isn't here
624       # yet, we need to fetch the remote to resolve it correctly.
625
626       # First, remove all local heads. This prevents a name that does
627       # not exist on the remote from resolving to (or colliding with)
628       # a previously fetched branch or tag (possibly from a different
629       # remote).
630       remove_tree("$local_repo/refs/heads", {keep_root => 1});
631
632       Log(undef, "Fetching objects from $repo to $local_repo");
633       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
634       if ($?) {
635         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
636       }
637     }
638
639     # Now that the data is all here, we will use our local repo for
640     # the rest of our git activities.
641     $repo = $local_repo;
642   }
643
644   my $gitcmd = "git --git-dir=\Q$repo\E";
645   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
646   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
647     croak("`$gitcmd rev-list` exited "
648           .exit_status_s($?)
649           .", '$treeish' not found, giving up");
650   }
651   $commit = $1;
652   Log(undef, "Version $treeish is commit $commit");
653
654   if ($commit ne $Job->{'script_version'}) {
655     # Record the real commit id in the database, frozentokey, logs,
656     # etc. -- instead of an abbreviation or a branch name which can
657     # become ambiguous or point to a different commit in the future.
658     if (!$Job->update_attributes('script_version' => $commit)) {
659       croak("Error: failed to update job's script_version attribute");
660     }
661   }
662
663   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
664   add_git_archive("$gitcmd archive ''\Q$commit\E");
665 }
666
667 my $git_archive = combined_git_archive();
668 if (!defined $git_archive) {
669   Log(undef, "Skip install phase (no git archive)");
670   if ($have_slurm) {
671     Log(undef, "Warning: This probably means workers have no source tree!");
672   }
673 }
674 else {
675   my $exited;
676   my $install_script_tries_left = 3;
677   for (my $attempts = 0; $attempts < 3; $attempts++) {
678     my @srunargs = ("srun",
679                     "--nodelist=$nodelist",
680                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
681     my @execargs = ("sh", "-c",
682                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
683
684     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
685     my ($stdout, $stderr, $tempfail);
686     ($exited, $stdout, $stderr, $tempfail) = srun_sync(
687       \@srunargs, \@execargs,
688       {label => "run install script on all workers"},
689         $build_script . $git_archive);
690     if ($tempfail) {
691       exit_retry_unlocked();
692     }
693
694     my $stderr_anything_from_script = 0;
695     for my $line (split(/\n/, $stderr)) {
696       if ($line !~ /^(srun: error: |starting: \[)/) {
697         $stderr_anything_from_script = 1;
698       }
699     }
700
701     last if $exited == 0 || $main::please_freeze;
702
703     # If the install script fails but doesn't print an error message,
704     # the next thing anyone is likely to do is just run it again in
705     # case it was a transient problem like "slurm communication fails
706     # because the network isn't reliable enough". So we'll just do
707     # that ourselves (up to 3 attempts in total). OTOH, if there is an
708     # error message, the problem is more likely to have a real fix and
709     # we should fail the job so the fixing process can start, instead
710     # of doing 2 more attempts.
711     last if $stderr_anything_from_script;
712   }
713
714   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
715     unlink($tar_filename);
716   }
717
718   if ($exited != 0) {
719     croak("Giving up");
720   }
721 }
722
723 foreach (qw (script script_version script_parameters runtime_constraints))
724 {
725   Log (undef,
726        "$_ " .
727        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
728 }
729 foreach (split (/\n/, $Job->{knobs}))
730 {
731   Log (undef, "knob " . $_);
732 }
733 my $resp = api_call(
734   'nodes/list',
735   'filters' => [['hostname', 'in', \@node]],
736   'order' => 'hostname',
737   'limit' => scalar(@node),
738     );
739 for my $n (@{$resp->{items}}) {
740   Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
741 }
742
743
744
745 $main::success = undef;
746
747
748
749 ONELEVEL:
750
751 my $thisround_succeeded = 0;
752 my $thisround_failed = 0;
753 my $thisround_failed_multiple = 0;
754 my $working_slot_count = scalar(@slot);
755
756 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
757                        or $a <=> $b } @jobstep_todo;
758 my $level = $jobstep[$jobstep_todo[0]]->{level};
759
760 my $initial_tasks_this_level = 0;
761 foreach my $id (@jobstep_todo) {
762   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
763 }
764
765 # If the number of tasks scheduled at this level #T is smaller than the number
766 # of slots available #S, only use the first #T slots, or the first slot on
767 # each node, whichever number is greater.
768 #
769 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
770 # based on these numbers.  Using fewer slots makes more resources available
771 # to each individual task, which should normally be a better strategy when
772 # there are fewer of them running with less parallelism.
773 #
774 # Note that this calculation is not redone if the initial tasks at
775 # this level queue more tasks at the same level.  This may harm
776 # overall task throughput for that level.
777 my @freeslot;
778 if ($initial_tasks_this_level < @node) {
779   @freeslot = (0..$#node);
780 } elsif ($initial_tasks_this_level < @slot) {
781   @freeslot = (0..$initial_tasks_this_level - 1);
782 } else {
783   @freeslot = (0..$#slot);
784 }
785 my $round_num_freeslots = scalar(@freeslot);
786 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
787
788 my %round_max_slots = ();
789 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
790   my $this_slot = $slot[$freeslot[$ii]];
791   my $node_name = $this_slot->{node}->{name};
792   $round_max_slots{$node_name} ||= $this_slot->{cpu};
793   last if (scalar(keys(%round_max_slots)) >= @node);
794 }
795
796 Log(undef, "start level $level with $round_num_freeslots slots");
797 my @holdslot;
798 my %reader;
799 my $progress_is_dirty = 1;
800 my $progress_stats_updated = 0;
801
802 update_progress_stats();
803
804
805 THISROUND:
806 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
807 {
808   # Don't create new tasks if we already know the job's final result.
809   last if defined($main::success);
810
811   my $id = $jobstep_todo[$todo_ptr];
812   my $Jobstep = $jobstep[$id];
813   if ($Jobstep->{level} != $level)
814   {
815     next;
816   }
817
818   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
819   set_nonblocking($reader{$id});
820
821   my $childslot = $freeslot[0];
822   my $childnode = $slot[$childslot]->{node};
823   my $childslotname = join (".",
824                             $slot[$childslot]->{node}->{name},
825                             $slot[$childslot]->{cpu});
826
827   my $childpid = fork();
828   if ($childpid == 0)
829   {
830     $SIG{'INT'} = 'DEFAULT';
831     $SIG{'QUIT'} = 'DEFAULT';
832     $SIG{'TERM'} = 'DEFAULT';
833
834     foreach (values (%reader))
835     {
836       close($_);
837     }
838     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
839     open(STDOUT,">&writer") or croak ($!);
840     open(STDERR,">&writer") or croak ($!);
841
842     undef $dbh;
843     undef $sth;
844
845     delete $ENV{"GNUPGHOME"};
846     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
847     $ENV{"TASK_QSEQUENCE"} = $id;
848     $ENV{"TASK_SEQUENCE"} = $level;
849     $ENV{"JOB_SCRIPT"} = $Job->{script};
850     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
851       $param =~ tr/a-z/A-Z/;
852       $ENV{"JOB_PARAMETER_$param"} = $value;
853     }
854     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
855     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
856     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
857     $ENV{"HOME"} = $ENV{"TASK_WORK"};
858     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
859     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
860     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
861
862     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
863
864     $ENV{"GZIP"} = "-n";
865
866     my @srunargs = (
867       "srun",
868       "--nodelist=".$childnode->{name},
869       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
870       "--job-name=$job_id.$id.$$",
871         );
872
873     my $stdbuf = " stdbuf --output=0 --error=0 ";
874
875     my $arv_file_cache = "";
876     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
877       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
878     }
879
880     my $command =
881         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
882         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
883         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
884         # These environment variables get used explicitly later in
885         # $command.  No tool is expected to read these values directly.
886         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
887         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
888         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
889         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
890         .q{&& declare -a VOLUMES=() }
891         .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
892         .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
893         .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
894
895     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
896     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
897     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
898
899     if ($docker_hash)
900     {
901       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
902       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
903       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
904       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
905       # We only set memory limits if Docker lets us limit both memory and swap.
906       # Memory limits alone have been supported longer, but subprocesses tend
907       # to get SIGKILL if they exceed that without any swap limit set.
908       # See #5642 for additional background.
909       if ($docker_limitmem) {
910         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
911       }
912
913       # The source tree and $destdir directory (which we have
914       # installed on the worker host) are available in the container,
915       # under the same path.
916       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
917       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
918
919       # Currently, we make the "by_pdh" directory in arv-mount's mount
920       # point appear at /keep inside the container (instead of using
921       # the same path as the host like we do with CRUNCH_SRC and
922       # CRUNCH_INSTALL). However, crunch scripts and utilities must
923       # not rely on this. They must use $TASK_KEEPMOUNT.
924       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
925       $ENV{TASK_KEEPMOUNT} = "/keep";
926
927       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
928       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
929       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
930
931       # TASK_WORK is almost exactly like a docker data volume: it
932       # starts out empty, is writable, and persists until no
933       # containers use it any more. We don't use --volumes-from to
934       # share it with other containers: it is only accessible to this
935       # task, and it goes away when this task stops.
936       #
937       # However, a docker data volume is writable only by root unless
938       # the mount point already happens to exist in the container with
939       # different permissions. Therefore, we [1] assume /tmp already
940       # exists in the image and is writable by the crunch user; [2]
941       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
942       # writable if they are created by docker while setting up the
943       # other --volumes); and [3] create $TASK_WORK inside the
944       # container using $build_script.
945       $command .= "--volume=/tmp ";
946       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
947       $ENV{"HOME"} = $ENV{"TASK_WORK"};
948       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
949
950       # TODO: Share a single JOB_WORK volume across all task
951       # containers on a given worker node, and delete it when the job
952       # ends (and, in case that doesn't work, when the next job
953       # starts).
954       #
955       # For now, use the same approach as TASK_WORK above.
956       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
957
958       # Bind mount the crunchrunner binary and host TLS certificates file into
959       # the container.
960       $command .= '"${VOLUMES[@]}" ';
961
962       while (my ($env_key, $env_val) = each %ENV)
963       {
964         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
965           $command .= "--env=\Q$env_key=$env_val\E ";
966         }
967       }
968       $command .= "--env=\QHOME=$ENV{HOME}\E ";
969       $command .= "\Q$docker_hash\E ";
970
971       if ($Job->{arvados_sdk_version}) {
972         $command .= $stdbuf;
973         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
974       } else {
975         $command .= "/bin/sh -c \'python -c " .
976             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
977             ">&2 2>/dev/null; " .
978             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
979             "if which stdbuf >/dev/null ; then " .
980             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
981             " else " .
982             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
983             " fi\'";
984       }
985     } else {
986       # Non-docker run
987       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
988       $command .= $stdbuf;
989       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
990     }
991
992     my @execargs = ('bash', '-c', $command);
993     srun (\@srunargs, \@execargs, undef, $build_script);
994     # exec() failed, we assume nothing happened.
995     die "srun() failed on build script\n";
996   }
997   close("writer");
998   if (!defined $childpid)
999   {
1000     close $reader{$id};
1001     delete $reader{$id};
1002     next;
1003   }
1004   shift @freeslot;
1005   $proc{$childpid} = {
1006     jobstepidx => $id,
1007     time => time,
1008     slot => $childslot,
1009     jobstepname => "$job_id.$id.$childpid",
1010   };
1011   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1012   $slot[$childslot]->{pid} = $childpid;
1013
1014   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1015   Log ($id, "child $childpid started on $childslotname");
1016   $Jobstep->{starttime} = time;
1017   $Jobstep->{node} = $childnode->{name};
1018   $Jobstep->{slotindex} = $childslot;
1019   delete $Jobstep->{stderr};
1020   delete $Jobstep->{finishtime};
1021   delete $Jobstep->{tempfail};
1022
1023   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1024   retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1025
1026   splice @jobstep_todo, $todo_ptr, 1;
1027   --$todo_ptr;
1028
1029   $progress_is_dirty = 1;
1030
1031   while (!@freeslot
1032          ||
1033          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1034   {
1035     last THISROUND if $main::please_freeze;
1036     if ($main::please_info)
1037     {
1038       $main::please_info = 0;
1039       freeze();
1040       create_output_collection();
1041       save_meta(1);
1042       update_progress_stats();
1043     }
1044     my $gotsome
1045         = readfrompipes ()
1046         + reapchildren ();
1047     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1048     {
1049       check_refresh_wanted();
1050       check_squeue();
1051       update_progress_stats();
1052     }
1053     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1054     {
1055       update_progress_stats();
1056     }
1057     if (!$gotsome) {
1058       select (undef, undef, undef, 0.1);
1059     }
1060     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1061                                         $_->{node}->{hold_count} < 4 } @slot);
1062     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1063         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1064     {
1065       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1066           .($thisround_failed+$thisround_succeeded)
1067           .") -- giving up on this round";
1068       Log (undef, $message);
1069       last THISROUND;
1070     }
1071
1072     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1073     for (my $i=$#freeslot; $i>=0; $i--) {
1074       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1075         push @holdslot, (splice @freeslot, $i, 1);
1076       }
1077     }
1078     for (my $i=$#holdslot; $i>=0; $i--) {
1079       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1080         push @freeslot, (splice @holdslot, $i, 1);
1081       }
1082     }
1083
1084     # give up if no nodes are succeeding
1085     if ($working_slot_count < 1) {
1086       Log(undef, "Every node has failed -- giving up");
1087       last THISROUND;
1088     }
1089   }
1090 }
1091
1092
1093 push @freeslot, splice @holdslot;
1094 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1095
1096
1097 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1098 while (%proc)
1099 {
1100   if ($main::please_continue) {
1101     $main::please_continue = 0;
1102     goto THISROUND;
1103   }
1104   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1105   readfrompipes ();
1106   if (!reapchildren())
1107   {
1108     check_refresh_wanted();
1109     check_squeue();
1110     update_progress_stats();
1111     select (undef, undef, undef, 0.1);
1112     killem (keys %proc) if $main::please_freeze;
1113   }
1114 }
1115
1116 update_progress_stats();
1117 freeze_if_want_freeze();
1118
1119
1120 if (!defined $main::success)
1121 {
1122   if (!@jobstep_todo) {
1123     $main::success = 1;
1124   } elsif ($working_slot_count < 1) {
1125     save_output_collection();
1126     save_meta();
1127     exit_retry_unlocked();
1128   } elsif ($thisround_succeeded == 0 &&
1129            ($thisround_failed == 0 || $thisround_failed > 4)) {
1130     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1131     Log (undef, $message);
1132     $main::success = 0;
1133   }
1134 }
1135
1136 goto ONELEVEL if !defined $main::success;
1137
1138
1139 release_allocation();
1140 freeze();
1141 my $collated_output = save_output_collection();
1142 Log (undef, "finish");
1143
1144 my $final_log = save_meta();
1145
1146 my $final_state;
1147 if ($collated_output && $final_log && $main::success) {
1148   $final_state = 'Complete';
1149 } else {
1150   $final_state = 'Failed';
1151 }
1152 $Job->update_attributes('state' => $final_state);
1153
1154 exit (($final_state eq 'Complete') ? 0 : 1);
1155
1156
1157
1158 sub update_progress_stats
1159 {
1160   $progress_stats_updated = time;
1161   return if !$progress_is_dirty;
1162   my ($todo, $done, $running) = (scalar @jobstep_todo,
1163                                  scalar @jobstep_done,
1164                                  scalar keys(%proc));
1165   $Job->{'tasks_summary'} ||= {};
1166   $Job->{'tasks_summary'}->{'todo'} = $todo;
1167   $Job->{'tasks_summary'}->{'done'} = $done;
1168   $Job->{'tasks_summary'}->{'running'} = $running;
1169   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1170   Log (undef, "status: $done done, $running running, $todo todo");
1171   $progress_is_dirty = 0;
1172 }
1173
1174
1175
1176 sub reapchildren
1177 {
1178   my $children_reaped = 0;
1179   my @successful_task_uuids = ();
1180
1181   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1182   {
1183     my $childstatus = $?;
1184
1185     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1186                     . "."
1187                     . $slot[$proc{$pid}->{slot}]->{cpu});
1188     my $jobstepidx = $proc{$pid}->{jobstepidx};
1189
1190     readfrompipes_after_exit ($jobstepidx);
1191
1192     $children_reaped++;
1193     my $elapsed = time - $proc{$pid}->{time};
1194     my $Jobstep = $jobstep[$jobstepidx];
1195
1196     my $exitvalue = $childstatus >> 8;
1197     my $exitinfo = "exit ".exit_status_s($childstatus);
1198     $Jobstep->{'arvados_task'}->reload;
1199     my $task_success = $Jobstep->{'arvados_task'}->{success};
1200
1201     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1202
1203     if (!defined $task_success) {
1204       # task did not indicate one way or the other --> fail
1205       Log($jobstepidx, sprintf(
1206             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1207             exit_status_s($childstatus)));
1208       $Jobstep->{'arvados_task'}->{success} = 0;
1209       retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1210       $task_success = 0;
1211     }
1212
1213     if (!$task_success)
1214     {
1215       my $temporary_fail;
1216       $temporary_fail ||= $Jobstep->{tempfail};
1217       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1218
1219       ++$thisround_failed;
1220       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1221
1222       # Check for signs of a failed or misconfigured node
1223       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1224           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1225         # Don't count this against jobstep failure thresholds if this
1226         # node is already suspected faulty and srun exited quickly
1227         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1228             $elapsed < 5) {
1229           Log ($jobstepidx, "blaming failure on suspect node " .
1230                $slot[$proc{$pid}->{slot}]->{node}->{name});
1231           $temporary_fail ||= 1;
1232         }
1233         ban_node_by_slot($proc{$pid}->{slot});
1234       }
1235
1236       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1237                                 ++$Jobstep->{'failures'},
1238                                 $temporary_fail ? 'temporary' : 'permanent',
1239                                 $elapsed));
1240
1241       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1242         # Give up on this task, and the whole job
1243         $main::success = 0;
1244       }
1245       # Put this task back on the todo queue
1246       push @jobstep_todo, $jobstepidx;
1247       $Job->{'tasks_summary'}->{'failed'}++;
1248     }
1249     else # task_success
1250     {
1251       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1252       ++$thisround_succeeded;
1253       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1254       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1255       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1256       push @jobstep_done, $jobstepidx;
1257       Log ($jobstepidx, "success in $elapsed seconds");
1258     }
1259     $Jobstep->{exitcode} = $childstatus;
1260     $Jobstep->{finishtime} = time;
1261     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1262     retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1263     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1264                               length($Jobstep->{'arvados_task'}->{output}),
1265                               $Jobstep->{'arvados_task'}->{output}));
1266
1267     close $reader{$jobstepidx};
1268     delete $reader{$jobstepidx};
1269     delete $slot[$proc{$pid}->{slot}]->{pid};
1270     push @freeslot, $proc{$pid}->{slot};
1271     delete $proc{$pid};
1272
1273     $progress_is_dirty = 1;
1274   }
1275
1276   if (scalar(@successful_task_uuids) > 0)
1277   {
1278     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1279     # Load new tasks
1280     my $newtask_list = [];
1281     my $newtask_results;
1282     do {
1283       $newtask_results = api_call(
1284         "job_tasks/list",
1285         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1286         'order' => 'qsequence',
1287         'offset' => scalar(@$newtask_list),
1288           );
1289       push(@$newtask_list, @{$newtask_results->{items}});
1290     } while (@{$newtask_results->{items}});
1291     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1292     foreach my $arvados_task (@$newtask_list) {
1293       my $jobstep = {
1294         'level' => $arvados_task->{'sequence'},
1295         'failures' => 0,
1296         'arvados_task' => $arvados_task
1297       };
1298       push @jobstep, $jobstep;
1299       push @jobstep_todo, $#jobstep;
1300     }
1301   }
1302
1303   return $children_reaped;
1304 }
1305
1306 sub check_refresh_wanted
1307 {
1308   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1309   if (@stat &&
1310       $stat[9] > $latest_refresh &&
1311       # ...and we have actually locked the job record...
1312       $job_id eq $Job->{'uuid'}) {
1313     $latest_refresh = scalar time;
1314     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1315     for my $attr ('cancelled_at',
1316                   'cancelled_by_user_uuid',
1317                   'cancelled_by_client_uuid',
1318                   'state') {
1319       $Job->{$attr} = $Job2->{$attr};
1320     }
1321     if ($Job->{'state'} ne "Running") {
1322       if ($Job->{'state'} eq "Cancelled") {
1323         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1324       } else {
1325         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1326       }
1327       $main::success = 0;
1328       $main::please_freeze = 1;
1329     }
1330   }
1331 }
1332
1333 sub check_squeue
1334 {
1335   my $last_squeue_check = $squeue_checked;
1336
1337   # Do not call `squeue` or check the kill list more than once every
1338   # 15 seconds.
1339   return if $last_squeue_check > time - 15;
1340   $squeue_checked = time;
1341
1342   # Look for children from which we haven't received stderr data since
1343   # the last squeue check. If no such children exist, all procs are
1344   # alive and there's no need to even look at squeue.
1345   #
1346   # As long as the crunchstat poll interval (10s) is shorter than the
1347   # squeue check interval (15s) this should make the squeue check an
1348   # infrequent event.
1349   my $silent_procs = 0;
1350   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1351   {
1352     if (!exists($js->{stderr_at}))
1353     {
1354       $js->{stderr_at} = 0;
1355     }
1356     if ($js->{stderr_at} < $last_squeue_check)
1357     {
1358       $silent_procs++;
1359     }
1360   }
1361   return if $silent_procs == 0;
1362
1363   # use killem() on procs whose killtime is reached
1364   while (my ($pid, $procinfo) = each %proc)
1365   {
1366     my $js = $jobstep[$procinfo->{jobstepidx}];
1367     if (exists $procinfo->{killtime}
1368         && $procinfo->{killtime} <= time
1369         && $js->{stderr_at} < $last_squeue_check)
1370     {
1371       my $sincewhen = "";
1372       if ($js->{stderr_at}) {
1373         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1374       }
1375       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1376       killem ($pid);
1377     }
1378   }
1379
1380   if (!$have_slurm)
1381   {
1382     # here is an opportunity to check for mysterious problems with local procs
1383     return;
1384   }
1385
1386   # Get a list of steps still running.  Note: squeue(1) says --steps
1387   # selects a format (which we override anyway) and allows us to
1388   # specify which steps we're interested in (which we don't).
1389   # Importantly, it also changes the meaning of %j from "job name" to
1390   # "step name" and (although this isn't mentioned explicitly in the
1391   # docs) switches from "one line per job" mode to "one line per step"
1392   # mode. Without it, we'd just get a list of one job, instead of a
1393   # list of N steps.
1394   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1395   if ($? != 0)
1396   {
1397     Log(undef, "warning: squeue exit status $? ($!)");
1398     return;
1399   }
1400   chop @squeue;
1401
1402   # which of my jobsteps are running, according to squeue?
1403   my %ok;
1404   for my $jobstepname (@squeue)
1405   {
1406     $ok{$jobstepname} = 1;
1407   }
1408
1409   # Check for child procs >60s old and not mentioned by squeue.
1410   while (my ($pid, $procinfo) = each %proc)
1411   {
1412     if ($procinfo->{time} < time - 60
1413         && $procinfo->{jobstepname}
1414         && !exists $ok{$procinfo->{jobstepname}}
1415         && !exists $procinfo->{killtime})
1416     {
1417       # According to slurm, this task has ended (successfully or not)
1418       # -- but our srun child hasn't exited. First we must wait (30
1419       # seconds) in case this is just a race between communication
1420       # channels. Then, if our srun child process still hasn't
1421       # terminated, we'll conclude some slurm communication
1422       # error/delay has caused the task to die without notifying srun,
1423       # and we'll kill srun ourselves.
1424       $procinfo->{killtime} = time + 30;
1425       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1426     }
1427   }
1428 }
1429
1430 sub check_sinfo
1431 {
1432   # If a node fails in a multi-node "srun" call during job setup, the call
1433   # may hang instead of exiting with a nonzero code.  This function checks
1434   # "sinfo" for the health of the nodes that were allocated and ensures that
1435   # they are all still in the "alloc" state.  If a node that is allocated to
1436   # this job is not in "alloc" state, then set please_freeze.
1437   #
1438   # This is only called from srun_sync() for node configuration.  If a
1439   # node fails doing actual work, there are other recovery mechanisms.
1440
1441   # Do not call `sinfo` more than once every 15 seconds.
1442   return if $sinfo_checked > time - 15;
1443   $sinfo_checked = time;
1444
1445   # The output format "%t" means output node states.
1446   my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1447   if ($? != 0)
1448   {
1449     Log(undef, "warning: sinfo exit status $? ($!)");
1450     return;
1451   }
1452   chop @sinfo;
1453
1454   foreach (@sinfo)
1455   {
1456     if ($_ != "alloc" && $_ != "alloc*") {
1457       $main::please_freeze = 1;
1458     }
1459   }
1460 }
1461
1462 sub release_allocation
1463 {
1464   if ($have_slurm)
1465   {
1466     Log (undef, "release job allocation");
1467     system "scancel $ENV{SLURM_JOB_ID}";
1468   }
1469 }
1470
1471
1472 sub readfrompipes
1473 {
1474   my $gotsome = 0;
1475   my %fd_job;
1476   my $sel = IO::Select->new();
1477   foreach my $jobstepidx (keys %reader)
1478   {
1479     my $fd = $reader{$jobstepidx};
1480     $sel->add($fd);
1481     $fd_job{$fd} = $jobstepidx;
1482
1483     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1484       $sel->add($stdout_fd);
1485       $fd_job{$stdout_fd} = $jobstepidx;
1486     }
1487   }
1488   # select on all reader fds with 0.1s timeout
1489   my @ready_fds = $sel->can_read(0.1);
1490   foreach my $fd (@ready_fds)
1491   {
1492     my $buf;
1493     if (0 < sysread ($fd, $buf, 65536))
1494     {
1495       $gotsome = 1;
1496       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1497
1498       my $jobstepidx = $fd_job{$fd};
1499       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1500         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1501         next;
1502       }
1503
1504       $jobstep[$jobstepidx]->{stderr_at} = time;
1505       $jobstep[$jobstepidx]->{stderr} .= $buf;
1506
1507       # Consume everything up to the last \n
1508       preprocess_stderr ($jobstepidx);
1509
1510       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1511       {
1512         # If we get a lot of stderr without a newline, chop off the
1513         # front to avoid letting our buffer grow indefinitely.
1514         substr ($jobstep[$jobstepidx]->{stderr},
1515                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1516       }
1517     }
1518   }
1519   return $gotsome;
1520 }
1521
1522
1523 # Consume all full lines of stderr for a jobstep. Everything after the
1524 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1525 # returning.
1526 sub preprocess_stderr
1527 {
1528   my $jobstepidx = shift;
1529   # slotindex is only defined for children running Arvados job tasks.
1530   # Be prepared to handle the undef case (for setup srun calls, etc.).
1531   my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1532
1533   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1534     my $line = $1;
1535     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1536     Log ($jobstepidx, "stderr $line");
1537     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1538       # If the allocation is revoked, we can't possibly continue, so mark all
1539       # nodes as failed.  This will cause the overall exit code to be
1540       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1541       # this job.
1542       $main::please_freeze = 1;
1543       foreach my $st (@slot) {
1544         $st->{node}->{fail_count}++;
1545       }
1546     }
1547     elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
1548       $jobstep[$jobstepidx]->{tempfail} = 1;
1549       if (defined($job_slot_index)) {
1550         $slot[$job_slot_index]->{node}->{fail_count}++;
1551         ban_node_by_slot($job_slot_index);
1552       }
1553     }
1554     elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1555       $jobstep[$jobstepidx]->{tempfail} = 1;
1556       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1557     }
1558     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1559       $jobstep[$jobstepidx]->{tempfail} = 1;
1560     }
1561   }
1562 }
1563
1564
1565 # Read whatever is still available on its stderr+stdout pipes after
1566 # the given child process has exited.
1567 sub readfrompipes_after_exit
1568 {
1569   my $jobstepidx = shift;
1570
1571   # The fact that the child has exited allows some convenient
1572   # simplifications: (1) all data must have already been written, so
1573   # there's no need to wait for more once sysread returns 0; (2) the
1574   # total amount of data available is bounded by the pipe buffer size,
1575   # so it's safe to read everything into one string.
1576   my $buf;
1577   while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
1578     $jobstep[$jobstepidx]->{stderr_at} = time;
1579     $jobstep[$jobstepidx]->{stderr} .= $buf;
1580   }
1581   if ($jobstep[$jobstepidx]->{stdout_r}) {
1582     while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
1583       $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1584     }
1585   }
1586   preprocess_stderr ($jobstepidx);
1587
1588   map {
1589     Log ($jobstepidx, "stderr $_");
1590   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1591   $jobstep[$jobstepidx]->{stderr} = '';
1592 }
1593
1594 sub fetch_block
1595 {
1596   my $hash = shift;
1597   my $keep;
1598   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1599     Log(undef, "fetch_block run error from arv-get $hash: $!");
1600     return undef;
1601   }
1602   my $output_block = "";
1603   while (1) {
1604     my $buf;
1605     my $bytes = sysread($keep, $buf, 1024 * 1024);
1606     if (!defined $bytes) {
1607       Log(undef, "fetch_block read error from arv-get: $!");
1608       $output_block = undef;
1609       last;
1610     } elsif ($bytes == 0) {
1611       # sysread returns 0 at the end of the pipe.
1612       last;
1613     } else {
1614       # some bytes were read into buf.
1615       $output_block .= $buf;
1616     }
1617   }
1618   close $keep;
1619   if ($?) {
1620     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1621     $output_block = undef;
1622   }
1623   return $output_block;
1624 }
1625
1626 # Create a collection by concatenating the output of all tasks (each
1627 # task's output is either a manifest fragment, a locator for a
1628 # manifest fragment stored in Keep, or nothing at all). Return the
1629 # portable_data_hash of the new collection.
1630 sub create_output_collection
1631 {
1632   Log (undef, "collate");
1633
1634   my ($child_out, $child_in);
1635   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1636 import arvados
1637 import sys
1638 print (arvados.api("v1").collections().
1639        create(body={"manifest_text": sys.stdin.read(),
1640                     "owner_uuid": sys.argv[2]}).
1641        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1642 }, retry_count(), $Job->{owner_uuid});
1643
1644   my $task_idx = -1;
1645   my $manifest_size = 0;
1646   for (@jobstep)
1647   {
1648     ++$task_idx;
1649     my $output = $_->{'arvados_task'}->{output};
1650     next if (!defined($output));
1651     my $next_write;
1652     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1653       $next_write = fetch_block($output);
1654     } else {
1655       $next_write = $output;
1656     }
1657     if (defined($next_write)) {
1658       if (!defined(syswrite($child_in, $next_write))) {
1659         # There's been an error writing.  Stop the loop.
1660         # We'll log details about the exit code later.
1661         last;
1662       } else {
1663         $manifest_size += length($next_write);
1664       }
1665     } else {
1666       my $uuid = $_->{'arvados_task'}->{'uuid'};
1667       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1668       $main::success = 0;
1669     }
1670   }
1671   close($child_in);
1672   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1673
1674   my $joboutput;
1675   my $s = IO::Select->new($child_out);
1676   if ($s->can_read(120)) {
1677     sysread($child_out, $joboutput, 1024 * 1024);
1678     waitpid($pid, 0);
1679     if ($?) {
1680       Log(undef, "output collection creation exited " . exit_status_s($?));
1681       $joboutput = undef;
1682     } else {
1683       chomp($joboutput);
1684     }
1685   } else {
1686     Log (undef, "timed out while creating output collection");
1687     foreach my $signal (2, 2, 2, 15, 15, 9) {
1688       kill($signal, $pid);
1689       last if waitpid($pid, WNOHANG) == -1;
1690       sleep(1);
1691     }
1692   }
1693   close($child_out);
1694
1695   return $joboutput;
1696 }
1697
1698 # Calls create_output_collection, logs the result, and returns it.
1699 # If that was successful, save that as the output in the job record.
1700 sub save_output_collection {
1701   my $collated_output = create_output_collection();
1702
1703   if (!$collated_output) {
1704     Log(undef, "Failed to write output collection");
1705   }
1706   else {
1707     Log(undef, "job output $collated_output");
1708     $Job->update_attributes('output' => $collated_output);
1709   }
1710   return $collated_output;
1711 }
1712
1713 sub killem
1714 {
1715   foreach (@_)
1716   {
1717     my $sig = 2;                # SIGINT first
1718     if (exists $proc{$_}->{"sent_$sig"} &&
1719         time - $proc{$_}->{"sent_$sig"} > 4)
1720     {
1721       $sig = 15;                # SIGTERM if SIGINT doesn't work
1722     }
1723     if (exists $proc{$_}->{"sent_$sig"} &&
1724         time - $proc{$_}->{"sent_$sig"} > 4)
1725     {
1726       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1727     }
1728     if (!exists $proc{$_}->{"sent_$sig"})
1729     {
1730       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1731       kill $sig, $_;
1732       select (undef, undef, undef, 0.1);
1733       if ($sig == 2)
1734       {
1735         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1736       }
1737       $proc{$_}->{"sent_$sig"} = time;
1738       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1739     }
1740   }
1741 }
1742
1743
1744 sub fhbits
1745 {
1746   my($bits);
1747   for (@_) {
1748     vec($bits,fileno($_),1) = 1;
1749   }
1750   $bits;
1751 }
1752
1753
1754 # Send log output to Keep via arv-put.
1755 #
1756 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1757 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1758 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1759 # $log_pipe_pid is the pid of the arv-put subprocess.
1760 #
1761 # The only functions that should access these variables directly are:
1762 #
1763 # log_writer_start($logfilename)
1764 #     Starts an arv-put pipe, reading data on stdin and writing it to
1765 #     a $logfilename file in an output collection.
1766 #
1767 # log_writer_read_output([$timeout])
1768 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1769 #     Passes $timeout to the select() call, with a default of 0.01.
1770 #     Returns the result of the last read() call on $log_pipe_out, or
1771 #     -1 if read() wasn't called because select() timed out.
1772 #     Only other log_writer_* functions should need to call this.
1773 #
1774 # log_writer_send($txt)
1775 #     Writes $txt to the output log collection.
1776 #
1777 # log_writer_finish()
1778 #     Closes the arv-put pipe and returns the output that it produces.
1779 #
1780 # log_writer_is_active()
1781 #     Returns a true value if there is currently a live arv-put
1782 #     process, false otherwise.
1783 #
1784 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1785     $log_pipe_pid);
1786
1787 sub log_writer_start($)
1788 {
1789   my $logfilename = shift;
1790   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1791                         'arv-put',
1792                         '--stream',
1793                         '--retries', '6',
1794                         '--filename', $logfilename,
1795                         '-');
1796   $log_pipe_out_buf = "";
1797   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1798 }
1799
1800 sub log_writer_read_output {
1801   my $timeout = shift || 0.01;
1802   my $read = -1;
1803   while ($read && $log_pipe_out_select->can_read($timeout)) {
1804     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1805                  length($log_pipe_out_buf));
1806   }
1807   if (!defined($read)) {
1808     Log(undef, "error reading log manifest from arv-put: $!");
1809   }
1810   return $read;
1811 }
1812
1813 sub log_writer_send($)
1814 {
1815   my $txt = shift;
1816   print $log_pipe_in $txt;
1817   log_writer_read_output();
1818 }
1819
1820 sub log_writer_finish()
1821 {
1822   return unless $log_pipe_pid;
1823
1824   close($log_pipe_in);
1825
1826   my $logger_failed = 0;
1827   my $read_result = log_writer_read_output(600);
1828   if ($read_result == -1) {
1829     $logger_failed = -1;
1830     Log (undef, "timed out reading from 'arv-put'");
1831   } elsif ($read_result != 0) {
1832     $logger_failed = -2;
1833     Log(undef, "failed to read arv-put log manifest to EOF");
1834   }
1835
1836   waitpid($log_pipe_pid, 0);
1837   if ($?) {
1838     $logger_failed ||= $?;
1839     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1840   }
1841
1842   close($log_pipe_out);
1843   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1844   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1845       $log_pipe_out_select = undef;
1846
1847   return $arv_put_output;
1848 }
1849
1850 sub log_writer_is_active() {
1851   return $log_pipe_pid;
1852 }
1853
1854 sub Log                         # ($jobstepidx, $logmessage)
1855 {
1856   my ($jobstepidx, $logmessage) = @_;
1857   if ($logmessage =~ /\n/) {
1858     for my $line (split (/\n/, $_[1])) {
1859       Log ($jobstepidx, $line);
1860     }
1861     return;
1862   }
1863   my $fh = select STDERR; $|=1; select $fh;
1864   my $task_qseq = '';
1865   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1866     $task_qseq = $jobstepidx;
1867   }
1868   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1869   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1870   $message .= "\n";
1871   my $datetime;
1872   if (log_writer_is_active() || -t STDERR) {
1873     my @gmtime = gmtime;
1874     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1875                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1876   }
1877   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1878
1879   if (log_writer_is_active()) {
1880     log_writer_send($datetime . " " . $message);
1881   }
1882 }
1883
1884
1885 sub croak
1886 {
1887   my ($package, $file, $line) = caller;
1888   my $message = "@_ at $file line $line\n";
1889   Log (undef, $message);
1890   release_allocation();
1891   freeze() if @jobstep_todo;
1892   create_output_collection() if @jobstep_todo;
1893   cleanup();
1894   save_meta();
1895   die;
1896 }
1897
1898
1899 sub cleanup
1900 {
1901   return unless $Job;
1902   if ($Job->{'state'} eq 'Cancelled') {
1903     $Job->update_attributes('finished_at' => scalar gmtime);
1904   } else {
1905     $Job->update_attributes('state' => 'Failed');
1906   }
1907 }
1908
1909
1910 sub save_meta
1911 {
1912   my $justcheckpoint = shift; # false if this will be the last meta saved
1913   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1914   return unless log_writer_is_active();
1915   my $log_manifest = log_writer_finish();
1916   return unless defined($log_manifest);
1917
1918   if ($Job->{log}) {
1919     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1920     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1921   }
1922
1923   my $log_coll = api_call(
1924     "collections/create", ensure_unique_name => 1, collection => {
1925       manifest_text => $log_manifest,
1926       owner_uuid => $Job->{owner_uuid},
1927       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1928     });
1929   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1930   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1931
1932   return $log_coll->{portable_data_hash};
1933 }
1934
1935
1936 sub freeze_if_want_freeze
1937 {
1938   if ($main::please_freeze)
1939   {
1940     release_allocation();
1941     if (@_)
1942     {
1943       # kill some srun procs before freeze+stop
1944       map { $proc{$_} = {} } @_;
1945       while (%proc)
1946       {
1947         killem (keys %proc);
1948         select (undef, undef, undef, 0.1);
1949         my $died;
1950         while (($died = waitpid (-1, WNOHANG)) > 0)
1951         {
1952           delete $proc{$died};
1953         }
1954       }
1955     }
1956     freeze();
1957     create_output_collection();
1958     cleanup();
1959     save_meta();
1960     exit 1;
1961   }
1962 }
1963
1964
1965 sub freeze
1966 {
1967   Log (undef, "Freeze not implemented");
1968   return;
1969 }
1970
1971
1972 sub thaw
1973 {
1974   croak ("Thaw not implemented");
1975 }
1976
1977
1978 sub freezequote
1979 {
1980   my $s = shift;
1981   $s =~ s/\\/\\\\/g;
1982   $s =~ s/\n/\\n/g;
1983   return $s;
1984 }
1985
1986
1987 sub freezeunquote
1988 {
1989   my $s = shift;
1990   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1991   return $s;
1992 }
1993
1994 sub srun_sync
1995 {
1996   my $srunargs = shift;
1997   my $execargs = shift;
1998   my $opts = shift || {};
1999   my $stdin = shift;
2000
2001   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
2002   Log (undef, "$label: start");
2003
2004   my ($stderr_r, $stderr_w);
2005   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
2006
2007   my ($stdout_r, $stdout_w);
2008   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
2009
2010   my $srunpid = fork();
2011   if ($srunpid == 0)
2012   {
2013     close($stderr_r);
2014     close($stdout_r);
2015     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
2016     fcntl($stdout_w, F_SETFL, 0) or croak($!);
2017     open(STDERR, ">&", $stderr_w) or croak ($!);
2018     open(STDOUT, ">&", $stdout_w) or croak ($!);
2019     srun ($srunargs, $execargs, $opts, $stdin);
2020     exit (1);
2021   }
2022   close($stderr_w);
2023   close($stdout_w);
2024
2025   set_nonblocking($stderr_r);
2026   set_nonblocking($stdout_r);
2027
2028   # Add entries to @jobstep and %proc so check_squeue() and
2029   # freeze_if_want_freeze() can treat it like a job task process.
2030   push @jobstep, {
2031     stderr => '',
2032     stderr_at => 0,
2033     stderr_captured => '',
2034     stdout_r => $stdout_r,
2035     stdout_captured => '',
2036   };
2037   my $jobstepidx = $#jobstep;
2038   $proc{$srunpid} = {
2039     jobstepidx => $jobstepidx,
2040   };
2041   $reader{$jobstepidx} = $stderr_r;
2042
2043   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2044     my $busy = readfrompipes();
2045     if (!$busy || ($latest_refresh + 2 < scalar time)) {
2046       check_refresh_wanted();
2047       check_squeue();
2048       check_sinfo();
2049     }
2050     if (!$busy) {
2051       select(undef, undef, undef, 0.1);
2052     }
2053     killem(keys %proc) if $main::please_freeze;
2054   }
2055   my $exited = $?;
2056
2057   readfrompipes_after_exit ($jobstepidx);
2058
2059   Log (undef, "$label: exit ".exit_status_s($exited));
2060
2061   close($stdout_r);
2062   close($stderr_r);
2063   delete $proc{$srunpid};
2064   delete $reader{$jobstepidx};
2065
2066   my $j = pop @jobstep;
2067   # If the srun showed signs of tempfail, ensure the caller treats that as a
2068   # failure case.
2069   if ($main::please_freeze || $j->{tempfail}) {
2070     $exited ||= 255;
2071   }
2072   return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
2073 }
2074
2075
2076 sub srun
2077 {
2078   my $srunargs = shift;
2079   my $execargs = shift;
2080   my $opts = shift || {};
2081   my $stdin = shift;
2082   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2083
2084   $Data::Dumper::Terse = 1;
2085   $Data::Dumper::Indent = 0;
2086   my $show_cmd = Dumper($args);
2087   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2088   $show_cmd =~ s/\n/ /g;
2089   if ($opts->{fork}) {
2090     Log(undef, "starting: $show_cmd");
2091   } else {
2092     # This is a child process: parent is in charge of reading our
2093     # stderr and copying it to Log() if needed.
2094     warn "starting: $show_cmd\n";
2095   }
2096
2097   if (defined $stdin) {
2098     my $child = open STDIN, "-|";
2099     defined $child or die "no fork: $!";
2100     if ($child == 0) {
2101       print $stdin or die $!;
2102       close STDOUT or die $!;
2103       exit 0;
2104     }
2105   }
2106
2107   return system (@$args) if $opts->{fork};
2108
2109   exec @$args;
2110   warn "ENV size is ".length(join(" ",%ENV));
2111   die "exec failed: $!: @$args";
2112 }
2113
2114
2115 sub ban_node_by_slot {
2116   # Don't start any new jobsteps on this node for 60 seconds
2117   my $slotid = shift;
2118   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2119   $slot[$slotid]->{node}->{hold_count}++;
2120   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2121 }
2122
2123 sub must_lock_now
2124 {
2125   my ($lockfile, $error_message) = @_;
2126   open L, ">", $lockfile or croak("$lockfile: $!");
2127   if (!flock L, LOCK_EX|LOCK_NB) {
2128     croak("Can't lock $lockfile: $error_message\n");
2129   }
2130 }
2131
2132 sub find_docker_image {
2133   # Given a Keep locator, check to see if it contains a Docker image.
2134   # If so, return its stream name and Docker hash.
2135   # If not, return undef for both values.
2136   my $locator = shift;
2137   my ($streamname, $filename);
2138   my $image = api_call("collections/get", uuid => $locator);
2139   if ($image) {
2140     foreach my $line (split(/\n/, $image->{manifest_text})) {
2141       my @tokens = split(/\s+/, $line);
2142       next if (!@tokens);
2143       $streamname = shift(@tokens);
2144       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2145         if (defined($filename)) {
2146           return (undef, undef);  # More than one file in the Collection.
2147         } else {
2148           $filename = (split(/:/, $filedata, 3))[2];
2149         }
2150       }
2151     }
2152   }
2153   if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2154     return ($streamname, $1);
2155   } else {
2156     return (undef, undef);
2157   }
2158 }
2159
2160 sub exit_retry_unlocked {
2161   Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
2162   exit(EX_RETRY_UNLOCKED);
2163 }
2164
2165 sub retry_count {
2166   # Calculate the number of times an operation should be retried,
2167   # assuming exponential backoff, and that we're willing to retry as
2168   # long as tasks have been running.  Enforce a minimum of 3 retries.
2169   my ($starttime, $endtime, $timediff, $retries);
2170   if (@jobstep) {
2171     $starttime = $jobstep[0]->{starttime};
2172     $endtime = $jobstep[-1]->{finishtime};
2173   }
2174   if (!defined($starttime)) {
2175     $timediff = 0;
2176   } elsif (!defined($endtime)) {
2177     $timediff = time - $starttime;
2178   } else {
2179     $timediff = ($endtime - $starttime) - (time - $endtime);
2180   }
2181   if ($timediff > 0) {
2182     $retries = int(log($timediff) / log(2));
2183   } else {
2184     $retries = 1;  # Use the minimum.
2185   }
2186   return ($retries > 3) ? $retries : 3;
2187 }
2188
2189 sub retry_op {
2190   # Pass in two function references.
2191   # This method will be called with the remaining arguments.
2192   # If it dies, retry it with exponential backoff until it succeeds,
2193   # or until the current retry_count is exhausted.  After each failure
2194   # that can be retried, the second function will be called with
2195   # the current try count (0-based), next try time, and error message.
2196   my $operation = shift;
2197   my $op_text = shift;
2198   my $retries = retry_count();
2199   my $retry_callback = sub {
2200     my ($try_count, $next_try_at, $errmsg) = @_;
2201     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2202     $errmsg =~ s/\s/ /g;
2203     $errmsg =~ s/\s+$//;
2204     my $retry_msg;
2205     if ($next_try_at < time) {
2206       $retry_msg = "Retrying.";
2207     } else {
2208       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2209       $retry_msg = "Retrying at $next_try_fmt.";
2210     }
2211     Log(undef, "$op_text failed: $errmsg. $retry_msg");
2212   };
2213   foreach my $try_count (0..$retries) {
2214     my $next_try = time + (2 ** $try_count);
2215     my $result = eval { $operation->(@_); };
2216     if (!$@) {
2217       return $result;
2218     } elsif ($try_count < $retries) {
2219       $retry_callback->($try_count, $next_try, $@);
2220       my $sleep_time = $next_try - time;
2221       sleep($sleep_time) if ($sleep_time > 0);
2222     }
2223   }
2224   # Ensure the error message ends in a newline, so Perl doesn't add
2225   # retry_op's line number to it.
2226   chomp($@);
2227   die($@ . "\n");
2228 }
2229
2230 sub api_call {
2231   # Pass in a /-separated API method name, and arguments for it.
2232   # This function will call that method, retrying as needed until
2233   # the current retry_count is exhausted, with a log on the first failure.
2234   my $method_name = shift;
2235   my $method = $arv;
2236   foreach my $key (split(/\//, $method_name)) {
2237     $method = $method->{$key};
2238   }
2239   return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
2240 }
2241
2242 sub exit_status_s {
2243   # Given a $?, return a human-readable exit code string like "0" or
2244   # "1" or "0 with signal 1" or "1 with signal 11".
2245   my $exitcode = shift;
2246   my $s = $exitcode >> 8;
2247   if ($exitcode & 0x7f) {
2248     $s .= " with signal " . ($exitcode & 0x7f);
2249   }
2250   if ($exitcode & 0x80) {
2251     $s .= " with core dump";
2252   }
2253   return $s;
2254 }
2255
2256 sub handle_readall {
2257   # Pass in a glob reference to a file handle.
2258   # Read all its contents and return them as a string.
2259   my $fh_glob_ref = shift;
2260   local $/ = undef;
2261   return <$fh_glob_ref>;
2262 }
2263
2264 sub tar_filename_n {
2265   my $n = shift;
2266   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2267 }
2268
2269 sub add_git_archive {
2270   # Pass in a git archive command as a string or list, a la system().
2271   # This method will save its output to be included in the archive sent to the
2272   # build script.
2273   my $git_input;
2274   $git_tar_count++;
2275   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2276     croak("Failed to save git archive: $!");
2277   }
2278   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2279   close($git_input);
2280   waitpid($git_pid, 0);
2281   close(GIT_ARCHIVE);
2282   if ($?) {
2283     croak("Failed to save git archive: git exited " . exit_status_s($?));
2284   }
2285 }
2286
2287 sub combined_git_archive {
2288   # Combine all saved tar archives into a single archive, then return its
2289   # contents in a string.  Return undef if no archives have been saved.
2290   if ($git_tar_count < 1) {
2291     return undef;
2292   }
2293   my $base_tar_name = tar_filename_n(1);
2294   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2295     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2296     if ($tar_exit != 0) {
2297       croak("Error preparing build archive: tar -A exited " .
2298             exit_status_s($tar_exit));
2299     }
2300   }
2301   if (!open(GIT_TAR, "<", $base_tar_name)) {
2302     croak("Could not open build archive: $!");
2303   }
2304   my $tar_contents = handle_readall(\*GIT_TAR);
2305   close(GIT_TAR);
2306   return $tar_contents;
2307 }
2308
2309 sub set_nonblocking {
2310   my $fh = shift;
2311   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2312   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2313 }
2314
2315 __DATA__
2316 #!/usr/bin/env perl
2317 #
2318 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2319 # server invokes this script on individual compute nodes, or localhost if we're
2320 # running a job locally.  It gets called in two modes:
2321 #
2322 # * No arguments: Installation mode.  Read a tar archive from the DATA
2323 #   file handle; it includes the Crunch script's source code, and
2324 #   maybe SDKs as well.  Those should be installed in the proper
2325 #   locations.  This runs outside of any Docker container, so don't try to
2326 #   introspect Crunch's runtime environment.
2327 #
2328 # * With arguments: Crunch script run mode.  This script should set up the
2329 #   environment, then run the command specified in the arguments.  This runs
2330 #   inside any Docker container.
2331
2332 use Fcntl ':flock';
2333 use File::Path qw( make_path remove_tree );
2334 use POSIX qw(getcwd);
2335
2336 use constant TASK_TEMPFAIL => 111;
2337
2338 # Map SDK subdirectories to the path environments they belong to.
2339 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2340
2341 my $destdir = $ENV{"CRUNCH_SRC"};
2342 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2343 my $repo = $ENV{"CRUNCH_SRC_URL"};
2344 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2345 my $job_work = $ENV{"JOB_WORK"};
2346 my $task_work = $ENV{"TASK_WORK"};
2347
2348 open(STDOUT_ORIG, ">&", STDOUT);
2349 open(STDERR_ORIG, ">&", STDERR);
2350
2351 for my $dir ($destdir, $job_work, $task_work) {
2352   if ($dir) {
2353     make_path $dir;
2354     -e $dir or die "Failed to create temporary directory ($dir): $!";
2355   }
2356 }
2357
2358 if ($task_work) {
2359   remove_tree($task_work, {keep_root => 1});
2360 }
2361
2362 ### Crunch script run mode
2363 if (@ARGV) {
2364   # We want to do routine logging during task 0 only.  This gives the user
2365   # the information they need, but avoids repeating the information for every
2366   # task.
2367   my $Log;
2368   if ($ENV{TASK_SEQUENCE} eq "0") {
2369     $Log = sub {
2370       my $msg = shift;
2371       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2372     };
2373   } else {
2374     $Log = sub { };
2375   }
2376
2377   my $python_src = "$install_dir/python";
2378   my $venv_dir = "$job_work/.arvados.venv";
2379   my $venv_built = -e "$venv_dir/bin/activate";
2380   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2381     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2382                  "--python=python2.7", $venv_dir);
2383     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2384     $venv_built = 1;
2385     $Log->("Built Python SDK virtualenv");
2386   }
2387
2388   my @pysdk_version_cmd = ("python", "-c",
2389     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2390   if ($venv_built) {
2391     $Log->("Running in Python SDK virtualenv");
2392     @pysdk_version_cmd = ();
2393     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2394     @ARGV = ("/bin/sh", "-ec",
2395              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2396   } elsif (-d $python_src) {
2397     $Log->("Warning: virtualenv not found inside Docker container default " .
2398            "\$PATH. Can't install Python SDK.");
2399   }
2400
2401   if (@pysdk_version_cmd) {
2402     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2403     my $pysdk_version = <$pysdk_version_pipe>;
2404     close($pysdk_version_pipe);
2405     if ($? == 0) {
2406       chomp($pysdk_version);
2407       $Log->("Using Arvados SDK version $pysdk_version");
2408     } else {
2409       # A lot could've gone wrong here, but pretty much all of it means that
2410       # Python won't be able to load the Arvados SDK.
2411       $Log->("Warning: Arvados SDK not found");
2412     }
2413   }
2414
2415   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2416     my $sdk_path = "$install_dir/$sdk_dir";
2417     if (-d $sdk_path) {
2418       if ($ENV{$sdk_envkey}) {
2419         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2420       } else {
2421         $ENV{$sdk_envkey} = $sdk_path;
2422       }
2423       $Log->("Arvados SDK added to %s", $sdk_envkey);
2424     }
2425   }
2426
2427   exec(@ARGV);
2428   die "Cannot exec `@ARGV`: $!";
2429 }
2430
2431 ### Installation mode
2432 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2433 flock L, LOCK_EX;
2434 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2435   # This exact git archive (source + arvados sdk) is already installed
2436   # here, so there's no need to reinstall it.
2437
2438   # We must consume our DATA section, though: otherwise the process
2439   # feeding it to us will get SIGPIPE.
2440   my $buf;
2441   while (read(DATA, $buf, 65536)) { }
2442
2443   exit(0);
2444 }
2445
2446 unlink "$destdir.archive_hash";
2447 mkdir $destdir;
2448
2449 do {
2450   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2451   local $SIG{PIPE} = "IGNORE";
2452   warn "Extracting archive: $archive_hash\n";
2453   # --ignore-zeros is necessary sometimes: depending on how much NUL
2454   # padding tar -A put on our combined archive (which in turn depends
2455   # on the length of the component archives) tar without
2456   # --ignore-zeros will exit before consuming stdin and cause close()
2457   # to fail on the resulting SIGPIPE.
2458   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2459     die "Error launching 'tar -xC $destdir': $!";
2460   }
2461   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2462   # get SIGPIPE.  We must feed it data incrementally.
2463   my $tar_input;
2464   while (read(DATA, $tar_input, 65536)) {
2465     print TARX $tar_input;
2466   }
2467   if(!close(TARX)) {
2468     die "'tar -xC $destdir' exited $?: $!";
2469   }
2470 };
2471
2472 mkdir $install_dir;
2473
2474 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2475 if (-d $sdk_root) {
2476   foreach my $sdk_lang (("python",
2477                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2478     if (-d "$sdk_root/$sdk_lang") {
2479       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2480         die "Failed to install $sdk_lang SDK: $!";
2481       }
2482     }
2483   }
2484 }
2485
2486 my $python_dir = "$install_dir/python";
2487 if ((-d $python_dir) and can_run("python2.7")) {
2488   open(my $egg_info_pipe, "-|",
2489        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2490   my @egg_info_errors = <$egg_info_pipe>;
2491   close($egg_info_pipe);
2492
2493   if ($?) {
2494     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2495       # egg_info apparently failed because it couldn't ask git for a build tag.
2496       # Specify no build tag.
2497       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2498       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2499       close($pysdk_cfg);
2500     } else {
2501       my $egg_info_exit = $? >> 8;
2502       foreach my $errline (@egg_info_errors) {
2503         warn $errline;
2504       }
2505       warn "python setup.py egg_info failed: exit $egg_info_exit";
2506       exit ($egg_info_exit || 1);
2507     }
2508   }
2509 }
2510
2511 # Hide messages from the install script (unless it fails: shell_or_die
2512 # will show $destdir.log in that case).
2513 open(STDOUT, ">>", "$destdir.log") or die ($!);
2514 open(STDERR, ">&", STDOUT) or die ($!);
2515
2516 if (-e "$destdir/crunch_scripts/install") {
2517     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2518 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2519     # Old version
2520     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2521 } elsif (-e "./install.sh") {
2522     shell_or_die (undef, "./install.sh", $install_dir);
2523 }
2524
2525 if ($archive_hash) {
2526     unlink "$destdir.archive_hash.new";
2527     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2528     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2529 }
2530
2531 close L;
2532
2533 sub can_run {
2534   my $command_name = shift;
2535   open(my $which, "-|", "which", $command_name) or die ($!);
2536   while (<$which>) { }
2537   close($which);
2538   return ($? == 0);
2539 }
2540
2541 sub shell_or_die
2542 {
2543   my $exitcode = shift;
2544
2545   if ($ENV{"DEBUG"}) {
2546     print STDERR "@_\n";
2547   }
2548   if (system (@_) != 0) {
2549     my $err = $!;
2550     my $code = $?;
2551     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2552     open STDERR, ">&STDERR_ORIG";
2553     system ("cat $destdir.log >&2");
2554     warn "@_ failed ($err): $exitstatus";
2555     if (defined($exitcode)) {
2556       exit $exitcode;
2557     }
2558     else {
2559       exit (($code >> 8) || 1);
2560     }
2561   }
2562 }
2563
2564 __DATA__