Merge branch 'master' into 14075-uploadfiles
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: AGPL-3.0
6
7 =head1 NAME
8
9 crunch-job: Execute job steps, save snapshots as requested, collate output.
10
11 =head1 SYNOPSIS
12
13 Obtain job details from Arvados, run tasks on compute nodes (typically
14 invoked by scheduler on controller):
15
16  crunch-job --job x-y-z --git-dir /path/to/repo/.git
17
18 Obtain job details from command line, run tasks on local machine
19 (typically invoked by application or developer on VM):
20
21  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
22
23  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
24
25 =head1 OPTIONS
26
27 =over
28
29 =item --force-unlock
30
31 If the job is already locked, steal the lock and run it anyway.
32
33 =item --git-dir
34
35 Path to a .git directory (or a git URL) where the commit given in the
36 job's C<script_version> attribute is to be found. If this is I<not>
37 given, the job's C<repository> attribute will be used.
38
39 =item --job-api-token
40
41 Arvados API authorization token to use during the course of the job.
42
43 =item --no-clear-tmp
44
45 Do not clear per-job/task temporary directories during initial job
46 setup. This can speed up development and debugging when running jobs
47 locally.
48
49 =item --job
50
51 UUID of the job to run, or a JSON-encoded job resource without a
52 UUID. If the latter is given, a new job object will be created.
53
54 =back
55
56 =head1 RUNNING JOBS LOCALLY
57
58 crunch-job's log messages appear on stderr along with the job tasks'
59 stderr streams. The log is saved in Keep at each checkpoint and when
60 the job finishes.
61
62 If the job succeeds, the job's output locator is printed on stdout.
63
64 While the job is running, the following signals are accepted:
65
66 =over
67
68 =item control-C, SIGINT, SIGQUIT
69
70 Save a checkpoint, terminate any job tasks that are running, and stop.
71
72 =item SIGALRM
73
74 Save a checkpoint and continue.
75
76 =item SIGHUP
77
78 Refresh node allocation (i.e., check whether any nodes have been added
79 or unallocated) and attributes of the Job record that should affect
80 behavior (e.g., cancel job if cancelled_at becomes non-nil).
81
82 =back
83
84 =cut
85
86
87 use strict;
88 use POSIX ':sys_wait_h';
89 use POSIX qw(strftime);
90 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Arvados;
92 use Cwd qw(realpath);
93 use Data::Dumper;
94 use Digest::MD5 qw(md5_hex);
95 use Getopt::Long;
96 use IPC::Open2;
97 use IO::Select;
98 use File::Temp;
99 use Fcntl ':flock';
100 use File::Path qw( make_path remove_tree );
101
102 use constant TASK_TEMPFAIL => 111;
103 use constant EX_TEMPFAIL => 75;
104 use constant EX_RETRY_UNLOCKED => 93;
105
106 $ENV{"TMPDIR"} ||= "/tmp";
107 unless (defined $ENV{"CRUNCH_TMP"}) {
108   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
109   if ($ENV{"USER"} ne "crunch" && $< != 0) {
110     # use a tmp dir unique for my uid
111     $ENV{"CRUNCH_TMP"} .= "-$<";
112   }
113 }
114
115 # Create the tmp directory if it does not exist
116 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
117   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
118 }
119
120 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
121 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
122 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
123 mkdir ($ENV{"JOB_WORK"});
124
125 my %proc;
126 my $force_unlock;
127 my $git_dir;
128 my $jobspec;
129 my $job_api_token;
130 my $no_clear_tmp;
131 my $resume_stash;
132 my $cgroup_root = "/sys/fs/cgroup";
133 my $docker_bin = "docker.io";
134 my $docker_run_args = "";
135 my $srun_sync_timeout = 15*60;
136 GetOptions('force-unlock' => \$force_unlock,
137            'git-dir=s' => \$git_dir,
138            'job=s' => \$jobspec,
139            'job-api-token=s' => \$job_api_token,
140            'no-clear-tmp' => \$no_clear_tmp,
141            'resume-stash=s' => \$resume_stash,
142            'cgroup-root=s' => \$cgroup_root,
143            'docker-bin=s' => \$docker_bin,
144            'docker-run-args=s' => \$docker_run_args,
145            'srun-sync-timeout=i' => \$srun_sync_timeout,
146     );
147
148 if (defined $job_api_token) {
149   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
150 }
151
152 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
153
154
155 $SIG{'USR1'} = sub
156 {
157   $main::ENV{CRUNCH_DEBUG} = 1;
158 };
159 $SIG{'USR2'} = sub
160 {
161   $main::ENV{CRUNCH_DEBUG} = 0;
162 };
163
164 my $arv = Arvados->new('apiVersion' => 'v1');
165
166 my $Job;
167 my $job_id;
168 my $dbh;
169 my $sth;
170 my @jobstep;
171
172 my $local_job;
173 if ($jobspec =~ /^[-a-z\d]+$/)
174 {
175   # $jobspec is an Arvados UUID, not a JSON job specification
176   $Job = api_call("jobs/get", uuid => $jobspec);
177   $local_job = 0;
178 }
179 else
180 {
181   $local_job = JSON::decode_json($jobspec);
182 }
183
184
185 # Make sure our workers (our slurm nodes, localhost, or whatever) are
186 # at least able to run basic commands: they aren't down or severely
187 # misconfigured.
188 my $cmd = ['true'];
189 if (($Job || $local_job)->{docker_image_locator}) {
190   $cmd = [$docker_bin, 'ps', '-q'];
191 }
192 Log(undef, "Sanity check is `@$cmd`");
193 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
194   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
195   $cmd,
196   {label => "sanity check"});
197 if ($exited != 0) {
198   Log(undef, "Sanity check failed: ".exit_status_s($exited));
199   exit EX_TEMPFAIL;
200 }
201 Log(undef, "Sanity check OK");
202
203
204 my $User = api_call("users/current");
205
206 if (!$local_job) {
207   if (!$force_unlock) {
208     # Claim this job, and make sure nobody else does
209     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
210     if ($@) {
211       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
212       exit EX_TEMPFAIL;
213     };
214   }
215 }
216 else
217 {
218   if (!$resume_stash)
219   {
220     map { croak ("No $_ specified") unless $local_job->{$_} }
221     qw(script script_version script_parameters);
222   }
223
224   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
225   $local_job->{'started_at'} = gmtime;
226   $local_job->{'state'} = 'Running';
227
228   $Job = api_call("jobs/create", job => $local_job);
229 }
230 $job_id = $Job->{'uuid'};
231
232 my $keep_logfile = $job_id . '.log.txt';
233 log_writer_start($keep_logfile);
234
235 $Job->{'runtime_constraints'} ||= {};
236 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
237 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
238
239 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
240 if ($? == 0) {
241   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
242   chomp($gem_versions);
243   chop($gem_versions);  # Closing parentheses
244 } else {
245   $gem_versions = "";
246 }
247 Log(undef,
248     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
249
250 Log (undef, "check slurm allocation");
251 my @slot;
252 my @node;
253 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
254 my @sinfo;
255 if (!$have_slurm)
256 {
257   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
258   push @sinfo, "$localcpus localhost";
259 }
260 if (exists $ENV{SLURM_NODELIST})
261 {
262   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
263 }
264 foreach (@sinfo)
265 {
266   my ($ncpus, $slurm_nodelist) = split;
267   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
268
269   my @nodelist;
270   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
271   {
272     my $nodelist = $1;
273     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
274     {
275       my $ranges = $1;
276       foreach (split (",", $ranges))
277       {
278         my ($a, $b);
279         if (/(\d+)-(\d+)/)
280         {
281           $a = $1;
282           $b = $2;
283         }
284         else
285         {
286           $a = $_;
287           $b = $_;
288         }
289         push @nodelist, map {
290           my $n = $nodelist;
291           $n =~ s/\[[-,\d]+\]/$_/;
292           $n;
293         } ($a..$b);
294       }
295     }
296     else
297     {
298       push @nodelist, $nodelist;
299     }
300   }
301   foreach my $nodename (@nodelist)
302   {
303     Log (undef, "node $nodename - $ncpus slots");
304     my $node = { name => $nodename,
305                  ncpus => $ncpus,
306                  # The number of consecutive times a task has been dispatched
307                  # to this node and failed.
308                  losing_streak => 0,
309                  # The number of consecutive times that SLURM has reported
310                  # a node failure since the last successful task.
311                  fail_count => 0,
312                  # Don't dispatch work to this node until this time
313                  # (in seconds since the epoch) has passed.
314                  hold_until => 0 };
315     foreach my $cpu (1..$ncpus)
316     {
317       push @slot, { node => $node,
318                     cpu => $cpu };
319     }
320   }
321   push @node, @nodelist;
322 }
323
324
325
326 # Ensure that we get one jobstep running on each allocated node before
327 # we start overloading nodes with concurrent steps
328
329 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
330
331
332 $Job->update_attributes(
333   'tasks_summary' => { 'failed' => 0,
334                        'todo' => 1,
335                        'running' => 0,
336                        'done' => 0 });
337
338 Log (undef, "start");
339 $SIG{'INT'} = sub { $main::please_freeze = 1; };
340 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
341 $SIG{'TERM'} = \&croak;
342 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
343 $SIG{'ALRM'} = sub { $main::please_info = 1; };
344 $SIG{'CONT'} = sub { $main::please_continue = 1; };
345 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
346
347 $main::please_freeze = 0;
348 $main::please_info = 0;
349 $main::please_continue = 0;
350 $main::please_refresh = 0;
351 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
352
353 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
354 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
355 $ENV{"JOB_UUID"} = $job_id;
356
357
358 my @jobstep_todo = ();
359 my @jobstep_done = ();
360 my @jobstep_tomerge = ();
361 my $jobstep_tomerge_level = 0;
362 my $squeue_checked = 0;
363 my $sinfo_checked = 0;
364 my $latest_refresh = scalar time;
365
366
367
368 if (defined $Job->{thawedfromkey})
369 {
370   thaw ($Job->{thawedfromkey});
371 }
372 else
373 {
374   my $first_task = api_call("job_tasks/create", job_task => {
375     'job_uuid' => $Job->{'uuid'},
376     'sequence' => 0,
377     'qsequence' => 0,
378     'parameters' => {},
379   });
380   push @jobstep, { 'level' => 0,
381                    'failures' => 0,
382                    'arvados_task' => $first_task,
383                  };
384   push @jobstep_todo, 0;
385 }
386
387
388 if (!$have_slurm)
389 {
390   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
391 }
392
393 my $build_script = handle_readall(\*DATA);
394 my $nodelist = join(",", @node);
395 my $git_tar_count = 0;
396
397 if (!defined $no_clear_tmp) {
398   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
399   # up work directories crunch_tmp/work, crunch_tmp/opt,
400   # crunch_tmp/src*.
401   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
402     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
403     ['bash', '-ec', q{
404 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
405 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
406     }],
407     {label => "clean work dirs"});
408   if ($exited != 0) {
409     exit_retry_unlocked();
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   Log (undef, "Install docker image $docker_locator");
417   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
418   if (!$docker_hash)
419   {
420     croak("No Docker image hash found from locator $docker_locator");
421   }
422   Log (undef, "docker image hash is $docker_hash");
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 loaded() {
426   id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
427   echo "image ID is \$id"
428   [[ \${id} = \Q$docker_hash\E ]]
429 }
430 if loaded >&2 2>/dev/null; then
431   echo >&2 "image is already present"
432   exit 0
433 fi
434 echo >&2 "docker image is not present; loading"
435 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
436 if ! loaded >&2; then
437   echo >&2 "`docker load` exited 0, but image is not found (!)"
438   exit 1
439 fi
440 echo >&2 "image loaded successfully"
441 };
442
443   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
444     ["srun", "--nodelist=" . join(',', @node)],
445     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
446     {label => "load docker image"});
447   if ($exited != 0)
448   {
449     exit_retry_unlocked();
450   }
451
452   # Determine whether this version of Docker supports memory+swap limits.
453   ($exited, $stdout, $stderr, $tempfail) = srun_sync(
454     ["srun", "--nodes=1"],
455     [$docker_bin, 'run', '--help'],
456     {label => "check --memory-swap feature"});
457   if ($tempfail) {
458     exit_retry_unlocked();
459   }
460   $docker_limitmem = ($stdout =~ /--memory-swap/);
461
462   # Find a non-root Docker user to use.
463   # Tries the default user for the container, then 'crunch', then 'nobody',
464   # testing for whether the actual user id is non-zero.  This defends against
465   # mistakes but not malice, but we intend to harden the security in the future
466   # so we don't want anyone getting used to their jobs running as root in their
467   # Docker containers.
468   my @tryusers = ("", "crunch", "nobody");
469   foreach my $try_user (@tryusers) {
470     my $label;
471     my $try_user_arg;
472     if ($try_user eq "") {
473       $label = "check whether default user is UID 0";
474       $try_user_arg = "";
475     } else {
476       $label = "check whether user '$try_user' is UID 0";
477       $try_user_arg = "--user=$try_user";
478     }
479     my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
480       ["srun", "--nodes=1"],
481       ["/bin/sh", "-ec",
482        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
483       {label => $label});
484     chomp($stdout);
485     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
486       $dockeruserarg = $try_user_arg;
487       if ($try_user eq "") {
488         Log(undef, "Container will run with default user");
489       } else {
490         Log(undef, "Container will run with $dockeruserarg");
491       }
492       last;
493     } elsif ($tempfail) {
494       exit_retry_unlocked();
495     }
496   }
497
498   if (!defined $dockeruserarg) {
499     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
500   }
501
502   if ($Job->{arvados_sdk_version}) {
503     # The job also specifies an Arvados SDK version.  Add the SDKs to the
504     # tar file for the build script to install.
505     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
506                        $Job->{arvados_sdk_version}));
507     add_git_archive("git", "--git-dir=$git_dir", "archive",
508                     "--prefix=.arvados.sdk/",
509                     $Job->{arvados_sdk_version}, "sdk");
510   }
511 }
512
513 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
514   # If script_version looks like an absolute path, *and* the --git-dir
515   # argument was not given -- which implies we were not invoked by
516   # crunch-dispatch -- we will use the given path as a working
517   # directory instead of resolving script_version to a git commit (or
518   # doing anything else with git).
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
520   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
521 }
522 else {
523   # Resolve the given script_version to a git commit sha1. Also, if
524   # the repository is remote, clone it into our local filesystem: this
525   # ensures "git archive" will work, and is necessary to reliably
526   # resolve a symbolic script_version like "master^".
527   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
528
529   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
530
531   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
532
533   # If we're running under crunch-dispatch, it will have already
534   # pulled the appropriate source tree into its own repository, and
535   # given us that repo's path as $git_dir.
536   #
537   # If we're running a "local" job, we might have to fetch content
538   # from a remote repository.
539   #
540   # (Currently crunch-dispatch gives a local path with --git-dir, but
541   # we might as well accept URLs there too in case it changes its
542   # mind.)
543   my $repo = $git_dir || $Job->{'repository'};
544
545   # Repository can be remote or local. If remote, we'll need to fetch it
546   # to a local dir before doing `git log` et al.
547   my $repo_location;
548
549   if ($repo =~ m{://|^[^/]*:}) {
550     # $repo is a git url we can clone, like git:// or https:// or
551     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
552     # not recognized here because distinguishing that from a local
553     # path is too fragile. If you really need something strange here,
554     # use the ssh:// form.
555     $repo_location = 'remote';
556   } elsif ($repo =~ m{^\.*/}) {
557     # $repo is a local path to a git index. We'll also resolve ../foo
558     # to ../foo/.git if the latter is a directory. To help
559     # disambiguate local paths from named hosted repositories, this
560     # form must be given as ./ or ../ if it's a relative path.
561     if (-d "$repo/.git") {
562       $repo = "$repo/.git";
563     }
564     $repo_location = 'local';
565   } else {
566     # $repo is none of the above. It must be the name of a hosted
567     # repository.
568     my $arv_repo_list = api_call("repositories/list",
569                                  'filters' => [['name','=',$repo]]);
570     my @repos_found = @{$arv_repo_list->{'items'}};
571     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
572     if ($n_found > 0) {
573       Log(undef, "Repository '$repo' -> "
574           . join(", ", map { $_->{'uuid'} } @repos_found));
575     }
576     if ($n_found != 1) {
577       croak("Error: Found $n_found repositories with name '$repo'.");
578     }
579     $repo = $repos_found[0]->{'fetch_url'};
580     $repo_location = 'remote';
581   }
582   Log(undef, "Using $repo_location repository '$repo'");
583   $ENV{"CRUNCH_SRC_URL"} = $repo;
584
585   # Resolve given script_version (we'll call that $treeish here) to a
586   # commit sha1 ($commit).
587   my $treeish = $Job->{'script_version'};
588   my $commit;
589   if ($repo_location eq 'remote') {
590     # We minimize excess object-fetching by re-using the same bare
591     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
592     # just keep adding remotes to it as needed.
593     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
594     my $gitcmd = "git --git-dir=\Q$local_repo\E";
595
596     # Set up our local repo for caching remote objects, making
597     # archives, etc.
598     if (!-d $local_repo) {
599       make_path($local_repo) or croak("Error: could not create $local_repo");
600     }
601     # This works (exits 0 and doesn't delete fetched objects) even
602     # if $local_repo is already initialized:
603     `$gitcmd init --bare`;
604     if ($?) {
605       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
606     }
607
608     # If $treeish looks like a hash (or abbrev hash) we look it up in
609     # our local cache first, since that's cheaper. (We don't want to
610     # do that with tags/branches though -- those change over time, so
611     # they should always be resolved by the remote repo.)
612     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
613       # Hide stderr because it's normal for this to fail:
614       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
615       if ($? == 0 &&
616           # Careful not to resolve a branch named abcdeff to commit 1234567:
617           $sha1 =~ /^$treeish/ &&
618           $sha1 =~ /^([0-9a-f]{40})$/s) {
619         $commit = $1;
620         Log(undef, "Commit $commit already present in $local_repo");
621       }
622     }
623
624     if (!defined $commit) {
625       # If $treeish isn't just a hash or abbrev hash, or isn't here
626       # yet, we need to fetch the remote to resolve it correctly.
627
628       # First, remove all local heads. This prevents a name that does
629       # not exist on the remote from resolving to (or colliding with)
630       # a previously fetched branch or tag (possibly from a different
631       # remote).
632       remove_tree("$local_repo/refs/heads", {keep_root => 1});
633
634       Log(undef, "Fetching objects from $repo to $local_repo");
635       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
636       if ($?) {
637         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
638       }
639     }
640
641     # Now that the data is all here, we will use our local repo for
642     # the rest of our git activities.
643     $repo = $local_repo;
644   }
645
646   my $gitcmd = "git --git-dir=\Q$repo\E";
647   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
648   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
649     croak("`$gitcmd rev-list` exited "
650           .exit_status_s($?)
651           .", '$treeish' not found, giving up");
652   }
653   $commit = $1;
654   Log(undef, "Version $treeish is commit $commit");
655
656   if ($commit ne $Job->{'script_version'}) {
657     # Record the real commit id in the database, frozentokey, logs,
658     # etc. -- instead of an abbreviation or a branch name which can
659     # become ambiguous or point to a different commit in the future.
660     if (!$Job->update_attributes('script_version' => $commit)) {
661       croak("Error: failed to update job's script_version attribute");
662     }
663   }
664
665   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
666   add_git_archive("$gitcmd archive ''\Q$commit\E");
667 }
668
669 my $git_archive = combined_git_archive();
670 if (!defined $git_archive) {
671   Log(undef, "Skip install phase (no git archive)");
672   if ($have_slurm) {
673     Log(undef, "Warning: This probably means workers have no source tree!");
674   }
675 }
676 else {
677   my $exited;
678   my $install_script_tries_left = 3;
679   for (my $attempts = 0; $attempts < 3; $attempts++) {
680     my @srunargs = ("srun",
681                     "--nodelist=$nodelist",
682                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
683     my @execargs = ("sh", "-c",
684                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
685
686     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
687     my ($stdout, $stderr, $tempfail);
688     ($exited, $stdout, $stderr, $tempfail) = srun_sync(
689       \@srunargs, \@execargs,
690       {label => "run install script on all workers"},
691         $build_script . $git_archive);
692     if ($tempfail) {
693       exit_retry_unlocked();
694     }
695
696     my $stderr_anything_from_script = 0;
697     for my $line (split(/\n/, $stderr)) {
698       if ($line !~ /^(srun: error: |starting: \[)/) {
699         $stderr_anything_from_script = 1;
700       }
701     }
702
703     last if $exited == 0 || $main::please_freeze;
704
705     # If the install script fails but doesn't print an error message,
706     # the next thing anyone is likely to do is just run it again in
707     # case it was a transient problem like "slurm communication fails
708     # because the network isn't reliable enough". So we'll just do
709     # that ourselves (up to 3 attempts in total). OTOH, if there is an
710     # error message, the problem is more likely to have a real fix and
711     # we should fail the job so the fixing process can start, instead
712     # of doing 2 more attempts.
713     last if $stderr_anything_from_script;
714   }
715
716   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
717     unlink($tar_filename);
718   }
719
720   if ($exited != 0) {
721     croak("Giving up");
722   }
723 }
724
725 foreach (qw (script script_version script_parameters runtime_constraints))
726 {
727   Log (undef,
728        "$_ " .
729        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
730 }
731 foreach (split (/\n/, $Job->{knobs}))
732 {
733   Log (undef, "knob " . $_);
734 }
735 my $resp = api_call(
736   'nodes/list',
737   'filters' => [['hostname', 'in', \@node]],
738   'order' => 'hostname',
739   'limit' => scalar(@node),
740     );
741 for my $n (@{$resp->{items}}) {
742   Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
743 }
744
745
746
747 $main::success = undef;
748
749
750
751 ONELEVEL:
752
753 my $thisround_succeeded = 0;
754 my $thisround_failed = 0;
755 my $thisround_failed_multiple = 0;
756 my $working_slot_count = scalar(@slot);
757
758 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
759                        or $a <=> $b } @jobstep_todo;
760 my $level = $jobstep[$jobstep_todo[0]]->{level};
761
762 my $initial_tasks_this_level = 0;
763 foreach my $id (@jobstep_todo) {
764   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
765 }
766
767 # If the number of tasks scheduled at this level #T is smaller than the number
768 # of slots available #S, only use the first #T slots, or the first slot on
769 # each node, whichever number is greater.
770 #
771 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
772 # based on these numbers.  Using fewer slots makes more resources available
773 # to each individual task, which should normally be a better strategy when
774 # there are fewer of them running with less parallelism.
775 #
776 # Note that this calculation is not redone if the initial tasks at
777 # this level queue more tasks at the same level.  This may harm
778 # overall task throughput for that level.
779 my @freeslot;
780 if ($initial_tasks_this_level < @node) {
781   @freeslot = (0..$#node);
782 } elsif ($initial_tasks_this_level < @slot) {
783   @freeslot = (0..$initial_tasks_this_level - 1);
784 } else {
785   @freeslot = (0..$#slot);
786 }
787 my $round_num_freeslots = scalar(@freeslot);
788 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
789
790 my %round_max_slots = ();
791 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
792   my $this_slot = $slot[$freeslot[$ii]];
793   my $node_name = $this_slot->{node}->{name};
794   $round_max_slots{$node_name} ||= $this_slot->{cpu};
795   last if (scalar(keys(%round_max_slots)) >= @node);
796 }
797
798 Log(undef, "start level $level with $round_num_freeslots slots");
799 my @holdslot;
800 my %reader;
801 my $progress_is_dirty = 1;
802 my $progress_stats_updated = 0;
803
804 update_progress_stats();
805
806
807 THISROUND:
808 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
809 {
810   # Don't create new tasks if we already know the job's final result.
811   last if defined($main::success);
812
813   my $id = $jobstep_todo[$todo_ptr];
814   my $Jobstep = $jobstep[$id];
815   if ($Jobstep->{level} != $level)
816   {
817     next;
818   }
819
820   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
821   set_nonblocking($reader{$id});
822
823   my $childslot = $freeslot[0];
824   my $childnode = $slot[$childslot]->{node};
825   my $childslotname = join (".",
826                             $slot[$childslot]->{node}->{name},
827                             $slot[$childslot]->{cpu});
828
829   my $childpid = fork();
830   if ($childpid == 0)
831   {
832     $SIG{'INT'} = 'DEFAULT';
833     $SIG{'QUIT'} = 'DEFAULT';
834     $SIG{'TERM'} = 'DEFAULT';
835
836     foreach (values (%reader))
837     {
838       close($_);
839     }
840     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
841     open(STDOUT,">&writer") or croak ($!);
842     open(STDERR,">&writer") or croak ($!);
843
844     undef $dbh;
845     undef $sth;
846
847     delete $ENV{"GNUPGHOME"};
848     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
849     $ENV{"TASK_QSEQUENCE"} = $id;
850     $ENV{"TASK_SEQUENCE"} = $level;
851     $ENV{"JOB_SCRIPT"} = $Job->{script};
852     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
853       $param =~ tr/a-z/A-Z/;
854       $ENV{"JOB_PARAMETER_$param"} = $value;
855     }
856     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
857     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
858     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
859     $ENV{"HOME"} = $ENV{"TASK_WORK"};
860     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
861     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
862     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
863
864     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
865
866     $ENV{"GZIP"} = "-n";
867
868     my @srunargs = (
869       "srun",
870       "--nodelist=".$childnode->{name},
871       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
872       "--job-name=$job_id.$id.$$",
873         );
874
875     my $stdbuf = " stdbuf --output=0 --error=0 ";
876
877     my $arv_file_cache = "";
878     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
879       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
880     }
881
882     my $command =
883         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
884         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
885         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
886         # These environment variables get used explicitly later in
887         # $command.  No tool is expected to read these values directly.
888         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
889         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
890         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
891         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
892         .q{&& declare -a VOLUMES=() }
893         .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
894         .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
895         .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
896
897     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
898     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
899     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
900
901     if ($docker_hash)
902     {
903       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
904       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
905       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
906       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
907       # We only set memory limits if Docker lets us limit both memory and swap.
908       # Memory limits alone have been supported longer, but subprocesses tend
909       # to get SIGKILL if they exceed that without any swap limit set.
910       # See #5642 for additional background.
911       if ($docker_limitmem) {
912         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
913       }
914
915       # The source tree and $destdir directory (which we have
916       # installed on the worker host) are available in the container,
917       # under the same path.
918       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
919       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
920
921       # Currently, we make the "by_pdh" directory in arv-mount's mount
922       # point appear at /keep inside the container (instead of using
923       # the same path as the host like we do with CRUNCH_SRC and
924       # CRUNCH_INSTALL). However, crunch scripts and utilities must
925       # not rely on this. They must use $TASK_KEEPMOUNT.
926       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
927       $ENV{TASK_KEEPMOUNT} = "/keep";
928
929       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
930       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
931       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
932
933       # TASK_WORK is almost exactly like a docker data volume: it
934       # starts out empty, is writable, and persists until no
935       # containers use it any more. We don't use --volumes-from to
936       # share it with other containers: it is only accessible to this
937       # task, and it goes away when this task stops.
938       #
939       # However, a docker data volume is writable only by root unless
940       # the mount point already happens to exist in the container with
941       # different permissions. Therefore, we [1] assume /tmp already
942       # exists in the image and is writable by the crunch user; [2]
943       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
944       # writable if they are created by docker while setting up the
945       # other --volumes); and [3] create $TASK_WORK inside the
946       # container using $build_script.
947       $command .= "--volume=/tmp ";
948       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
949       $ENV{"HOME"} = $ENV{"TASK_WORK"};
950       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
951
952       # TODO: Share a single JOB_WORK volume across all task
953       # containers on a given worker node, and delete it when the job
954       # ends (and, in case that doesn't work, when the next job
955       # starts).
956       #
957       # For now, use the same approach as TASK_WORK above.
958       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
959
960       # Bind mount the crunchrunner binary and host TLS certificates file into
961       # the container.
962       $command .= '"${VOLUMES[@]}" ';
963
964       while (my ($env_key, $env_val) = each %ENV)
965       {
966         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
967           $command .= "--env=\Q$env_key=$env_val\E ";
968         }
969       }
970       $command .= "--env=\QHOME=$ENV{HOME}\E ";
971       $command .= "\Q$docker_hash\E ";
972
973       if ($Job->{arvados_sdk_version}) {
974         $command .= $stdbuf;
975         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
976       } else {
977         $command .= "/bin/sh -c \'python -c " .
978             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
979             ">&2 2>/dev/null; " .
980             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
981             "if which stdbuf >/dev/null ; then " .
982             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
983             " else " .
984             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
985             " fi\'";
986       }
987     } else {
988       # Non-docker run
989       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
990       $command .= $stdbuf;
991       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
992     }
993
994     my @execargs = ('bash', '-c', $command);
995     srun (\@srunargs, \@execargs, undef, $build_script);
996     # exec() failed, we assume nothing happened.
997     die "srun() failed on build script\n";
998   }
999   close("writer");
1000   if (!defined $childpid)
1001   {
1002     close $reader{$id};
1003     delete $reader{$id};
1004     next;
1005   }
1006   shift @freeslot;
1007   $proc{$childpid} = {
1008     jobstepidx => $id,
1009     time => time,
1010     slot => $childslot,
1011     jobstepname => "$job_id.$id.$childpid",
1012   };
1013   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1014   $slot[$childslot]->{pid} = $childpid;
1015
1016   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1017   Log ($id, "child $childpid started on $childslotname");
1018   $Jobstep->{starttime} = time;
1019   $Jobstep->{node} = $childnode->{name};
1020   $Jobstep->{slotindex} = $childslot;
1021   delete $Jobstep->{stderr};
1022   delete $Jobstep->{finishtime};
1023   delete $Jobstep->{tempfail};
1024
1025   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1026   retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1027
1028   splice @jobstep_todo, $todo_ptr, 1;
1029   --$todo_ptr;
1030
1031   $progress_is_dirty = 1;
1032
1033   while (!@freeslot
1034          ||
1035          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1036   {
1037     last THISROUND if $main::please_freeze;
1038     if ($main::please_info)
1039     {
1040       $main::please_info = 0;
1041       freeze();
1042       create_output_collection();
1043       save_meta(1);
1044       update_progress_stats();
1045     }
1046     my $gotsome
1047         = readfrompipes ()
1048         + reapchildren ();
1049     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1050     {
1051       check_refresh_wanted();
1052       check_squeue();
1053       update_progress_stats();
1054     }
1055     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1056     {
1057       update_progress_stats();
1058     }
1059     if (!$gotsome) {
1060       select (undef, undef, undef, 0.1);
1061     }
1062     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1063                                         $_->{node}->{hold_count} < 4 } @slot);
1064     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1065         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1066     {
1067       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1068           .($thisround_failed+$thisround_succeeded)
1069           .") -- giving up on this round";
1070       Log (undef, $message);
1071       last THISROUND;
1072     }
1073
1074     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1075     for (my $i=$#freeslot; $i>=0; $i--) {
1076       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1077         push @holdslot, (splice @freeslot, $i, 1);
1078       }
1079     }
1080     for (my $i=$#holdslot; $i>=0; $i--) {
1081       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1082         push @freeslot, (splice @holdslot, $i, 1);
1083       }
1084     }
1085
1086     # give up if no nodes are succeeding
1087     if ($working_slot_count < 1) {
1088       Log(undef, "Every node has failed -- giving up");
1089       last THISROUND;
1090     }
1091   }
1092 }
1093
1094
1095 push @freeslot, splice @holdslot;
1096 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1097
1098
1099 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1100 while (%proc)
1101 {
1102   if ($main::please_continue) {
1103     $main::please_continue = 0;
1104     goto THISROUND;
1105   }
1106   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1107   readfrompipes ();
1108   if (!reapchildren())
1109   {
1110     check_refresh_wanted();
1111     check_squeue();
1112     update_progress_stats();
1113     select (undef, undef, undef, 0.1);
1114     killem (keys %proc) if $main::please_freeze;
1115   }
1116 }
1117
1118 update_progress_stats();
1119 freeze_if_want_freeze();
1120
1121
1122 if (!defined $main::success)
1123 {
1124   if (!@jobstep_todo) {
1125     $main::success = 1;
1126   } elsif ($working_slot_count < 1) {
1127     save_output_collection();
1128     save_meta();
1129     exit_retry_unlocked();
1130   } elsif ($thisround_succeeded == 0 &&
1131            ($thisround_failed == 0 || $thisround_failed > 4)) {
1132     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1133     Log (undef, $message);
1134     $main::success = 0;
1135   }
1136 }
1137
1138 goto ONELEVEL if !defined $main::success;
1139
1140
1141 release_allocation();
1142 freeze();
1143 my $collated_output = save_output_collection();
1144 Log (undef, "finish");
1145
1146 my $final_log = save_meta();
1147
1148 my $final_state;
1149 if ($collated_output && $final_log && $main::success) {
1150   $final_state = 'Complete';
1151 } else {
1152   $final_state = 'Failed';
1153 }
1154 $Job->update_attributes('state' => $final_state);
1155
1156 exit (($final_state eq 'Complete') ? 0 : 1);
1157
1158
1159
1160 sub update_progress_stats
1161 {
1162   $progress_stats_updated = time;
1163   return if !$progress_is_dirty;
1164   my ($todo, $done, $running) = (scalar @jobstep_todo,
1165                                  scalar @jobstep_done,
1166                                  scalar keys(%proc));
1167   $Job->{'tasks_summary'} ||= {};
1168   $Job->{'tasks_summary'}->{'todo'} = $todo;
1169   $Job->{'tasks_summary'}->{'done'} = $done;
1170   $Job->{'tasks_summary'}->{'running'} = $running;
1171   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1172   Log (undef, "status: $done done, $running running, $todo todo");
1173   $progress_is_dirty = 0;
1174 }
1175
1176
1177
1178 sub reapchildren
1179 {
1180   my $children_reaped = 0;
1181   my @successful_task_uuids = ();
1182
1183   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1184   {
1185     my $childstatus = $?;
1186
1187     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1188                     . "."
1189                     . $slot[$proc{$pid}->{slot}]->{cpu});
1190     my $jobstepidx = $proc{$pid}->{jobstepidx};
1191
1192     readfrompipes_after_exit ($jobstepidx);
1193
1194     $children_reaped++;
1195     my $elapsed = time - $proc{$pid}->{time};
1196     my $Jobstep = $jobstep[$jobstepidx];
1197
1198     my $exitvalue = $childstatus >> 8;
1199     my $exitinfo = "exit ".exit_status_s($childstatus);
1200     $Jobstep->{'arvados_task'}->reload;
1201     my $task_success = $Jobstep->{'arvados_task'}->{success};
1202
1203     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1204
1205     if (!defined $task_success) {
1206       # task did not indicate one way or the other --> fail
1207       Log($jobstepidx, sprintf(
1208             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1209             exit_status_s($childstatus)));
1210       $Jobstep->{'arvados_task'}->{success} = 0;
1211       retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1212       $task_success = 0;
1213     }
1214
1215     if (!$task_success)
1216     {
1217       my $temporary_fail;
1218       $temporary_fail ||= $Jobstep->{tempfail};
1219       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1220
1221       ++$thisround_failed;
1222       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1223
1224       # Check for signs of a failed or misconfigured node
1225       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1226           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1227         # Don't count this against jobstep failure thresholds if this
1228         # node is already suspected faulty and srun exited quickly
1229         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1230             $elapsed < 5) {
1231           Log ($jobstepidx, "blaming failure on suspect node " .
1232                $slot[$proc{$pid}->{slot}]->{node}->{name});
1233           $temporary_fail ||= 1;
1234         }
1235         ban_node_by_slot($proc{$pid}->{slot});
1236       }
1237
1238       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1239                                 ++$Jobstep->{'failures'},
1240                                 $temporary_fail ? 'temporary' : 'permanent',
1241                                 $elapsed));
1242
1243       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1244         # Give up on this task, and the whole job
1245         $main::success = 0;
1246       }
1247       # Put this task back on the todo queue
1248       push @jobstep_todo, $jobstepidx;
1249       $Job->{'tasks_summary'}->{'failed'}++;
1250     }
1251     else # task_success
1252     {
1253       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1254       ++$thisround_succeeded;
1255       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1256       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1257       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1258       push @jobstep_done, $jobstepidx;
1259       Log ($jobstepidx, "success in $elapsed seconds");
1260     }
1261     $Jobstep->{exitcode} = $childstatus;
1262     $Jobstep->{finishtime} = time;
1263     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1264     retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1265     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1266                               length($Jobstep->{'arvados_task'}->{output}),
1267                               $Jobstep->{'arvados_task'}->{output}));
1268
1269     close $reader{$jobstepidx};
1270     delete $reader{$jobstepidx};
1271     delete $slot[$proc{$pid}->{slot}]->{pid};
1272     push @freeslot, $proc{$pid}->{slot};
1273     delete $proc{$pid};
1274
1275     $progress_is_dirty = 1;
1276   }
1277
1278   if (scalar(@successful_task_uuids) > 0)
1279   {
1280     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1281     # Load new tasks
1282     my $newtask_list = [];
1283     my $newtask_results;
1284     do {
1285       $newtask_results = api_call(
1286         "job_tasks/list",
1287         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1288         'order' => 'qsequence',
1289         'offset' => scalar(@$newtask_list),
1290           );
1291       push(@$newtask_list, @{$newtask_results->{items}});
1292     } while (@{$newtask_results->{items}});
1293     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1294     foreach my $arvados_task (@$newtask_list) {
1295       my $jobstep = {
1296         'level' => $arvados_task->{'sequence'},
1297         'failures' => 0,
1298         'arvados_task' => $arvados_task
1299       };
1300       push @jobstep, $jobstep;
1301       push @jobstep_todo, $#jobstep;
1302     }
1303   }
1304
1305   return $children_reaped;
1306 }
1307
1308 sub check_refresh_wanted
1309 {
1310   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1311   if (@stat &&
1312       $stat[9] > $latest_refresh &&
1313       # ...and we have actually locked the job record...
1314       $job_id eq $Job->{'uuid'}) {
1315     $latest_refresh = scalar time;
1316     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1317     for my $attr ('cancelled_at',
1318                   'cancelled_by_user_uuid',
1319                   'cancelled_by_client_uuid',
1320                   'state') {
1321       $Job->{$attr} = $Job2->{$attr};
1322     }
1323     if ($Job->{'state'} ne "Running") {
1324       if ($Job->{'state'} eq "Cancelled") {
1325         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1326       } else {
1327         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1328       }
1329       $main::success = 0;
1330       $main::please_freeze = 1;
1331     }
1332   }
1333 }
1334
1335 sub check_squeue
1336 {
1337   my $last_squeue_check = $squeue_checked;
1338
1339   # Do not call `squeue` or check the kill list more than once every
1340   # 15 seconds.
1341   return if $last_squeue_check > time - 15;
1342   $squeue_checked = time;
1343
1344   # Look for children from which we haven't received stderr data since
1345   # the last squeue check. If no such children exist, all procs are
1346   # alive and there's no need to even look at squeue.
1347   #
1348   # As long as the crunchstat poll interval (10s) is shorter than the
1349   # squeue check interval (15s) this should make the squeue check an
1350   # infrequent event.
1351   my $silent_procs = 0;
1352   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1353   {
1354     if (!exists($js->{stderr_at}))
1355     {
1356       $js->{stderr_at} = 0;
1357     }
1358     if ($js->{stderr_at} < $last_squeue_check)
1359     {
1360       $silent_procs++;
1361     }
1362   }
1363   return if $silent_procs == 0;
1364
1365   # use killem() on procs whose killtime is reached
1366   while (my ($pid, $procinfo) = each %proc)
1367   {
1368     my $js = $jobstep[$procinfo->{jobstepidx}];
1369     if (exists $procinfo->{killtime}
1370         && $procinfo->{killtime} <= time
1371         && $js->{stderr_at} < $last_squeue_check)
1372     {
1373       my $sincewhen = "";
1374       if ($js->{stderr_at}) {
1375         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1376       }
1377       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1378       killem ($pid);
1379     }
1380   }
1381
1382   if (!$have_slurm)
1383   {
1384     # here is an opportunity to check for mysterious problems with local procs
1385     return;
1386   }
1387
1388   # Get a list of steps still running.  Note: squeue(1) says --steps
1389   # selects a format (which we override anyway) and allows us to
1390   # specify which steps we're interested in (which we don't).
1391   # Importantly, it also changes the meaning of %j from "job name" to
1392   # "step name" and (although this isn't mentioned explicitly in the
1393   # docs) switches from "one line per job" mode to "one line per step"
1394   # mode. Without it, we'd just get a list of one job, instead of a
1395   # list of N steps.
1396   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1397   if ($? != 0)
1398   {
1399     Log(undef, "warning: squeue exit status $? ($!)");
1400     return;
1401   }
1402   chop @squeue;
1403
1404   # which of my jobsteps are running, according to squeue?
1405   my %ok;
1406   for my $jobstepname (@squeue)
1407   {
1408     $ok{$jobstepname} = 1;
1409   }
1410
1411   # Check for child procs >60s old and not mentioned by squeue.
1412   while (my ($pid, $procinfo) = each %proc)
1413   {
1414     if ($procinfo->{time} < time - 60
1415         && $procinfo->{jobstepname}
1416         && !exists $ok{$procinfo->{jobstepname}}
1417         && !exists $procinfo->{killtime})
1418     {
1419       # According to slurm, this task has ended (successfully or not)
1420       # -- but our srun child hasn't exited. First we must wait (30
1421       # seconds) in case this is just a race between communication
1422       # channels. Then, if our srun child process still hasn't
1423       # terminated, we'll conclude some slurm communication
1424       # error/delay has caused the task to die without notifying srun,
1425       # and we'll kill srun ourselves.
1426       $procinfo->{killtime} = time + 30;
1427       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1428     }
1429   }
1430 }
1431
1432 sub check_sinfo
1433 {
1434   # If a node fails in a multi-node "srun" call during job setup, the call
1435   # may hang instead of exiting with a nonzero code.  This function checks
1436   # "sinfo" for the health of the nodes that were allocated and ensures that
1437   # they are all still in the "alloc" state.  If a node that is allocated to
1438   # this job is not in "alloc" state, then set please_freeze.
1439   #
1440   # This is only called from srun_sync() for node configuration.  If a
1441   # node fails doing actual work, there are other recovery mechanisms.
1442
1443   # Do not call `sinfo` more than once every 15 seconds.
1444   return if $sinfo_checked > time - 15;
1445   $sinfo_checked = time;
1446
1447   # The output format "%t" means output node states.
1448   my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1449   if ($? != 0)
1450   {
1451     Log(undef, "warning: sinfo exit status $? ($!)");
1452     return;
1453   }
1454   chop @sinfo;
1455
1456   foreach (@sinfo)
1457   {
1458     if ($_ != "alloc" && $_ != "alloc*") {
1459       $main::please_freeze = 1;
1460     }
1461   }
1462 }
1463
1464 sub release_allocation
1465 {
1466   if ($have_slurm)
1467   {
1468     Log (undef, "release job allocation");
1469     system "scancel $ENV{SLURM_JOB_ID}";
1470   }
1471 }
1472
1473
1474 sub readfrompipes
1475 {
1476   my $gotsome = 0;
1477   my %fd_job;
1478   my $sel = IO::Select->new();
1479   foreach my $jobstepidx (keys %reader)
1480   {
1481     my $fd = $reader{$jobstepidx};
1482     $sel->add($fd);
1483     $fd_job{$fd} = $jobstepidx;
1484
1485     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1486       $sel->add($stdout_fd);
1487       $fd_job{$stdout_fd} = $jobstepidx;
1488     }
1489   }
1490   # select on all reader fds with 0.1s timeout
1491   my @ready_fds = $sel->can_read(0.1);
1492   foreach my $fd (@ready_fds)
1493   {
1494     my $buf;
1495     if (0 < sysread ($fd, $buf, 65536))
1496     {
1497       $gotsome = 1;
1498       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1499
1500       my $jobstepidx = $fd_job{$fd};
1501       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1502         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1503         next;
1504       }
1505
1506       $jobstep[$jobstepidx]->{stderr_at} = time;
1507       $jobstep[$jobstepidx]->{stderr} .= $buf;
1508
1509       # Consume everything up to the last \n
1510       preprocess_stderr ($jobstepidx);
1511
1512       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1513       {
1514         # If we get a lot of stderr without a newline, chop off the
1515         # front to avoid letting our buffer grow indefinitely.
1516         substr ($jobstep[$jobstepidx]->{stderr},
1517                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1518       }
1519     }
1520   }
1521   return $gotsome;
1522 }
1523
1524
1525 # Consume all full lines of stderr for a jobstep. Everything after the
1526 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1527 # returning.
1528 sub preprocess_stderr
1529 {
1530   my $jobstepidx = shift;
1531   # slotindex is only defined for children running Arvados job tasks.
1532   # Be prepared to handle the undef case (for setup srun calls, etc.).
1533   my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1534
1535   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1536     my $line = $1;
1537     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1538     Log ($jobstepidx, "stderr $line");
1539     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1540       # If the allocation is revoked, we can't possibly continue, so mark all
1541       # nodes as failed.  This will cause the overall exit code to be
1542       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1543       # this job.
1544       $main::please_freeze = 1;
1545       foreach my $st (@slot) {
1546         $st->{node}->{fail_count}++;
1547       }
1548     }
1549     elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
1550       $jobstep[$jobstepidx]->{tempfail} = 1;
1551       if (defined($job_slot_index)) {
1552         $slot[$job_slot_index]->{node}->{fail_count}++;
1553         ban_node_by_slot($job_slot_index);
1554       }
1555     }
1556     elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1557       $jobstep[$jobstepidx]->{tempfail} = 1;
1558       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1559     }
1560     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1561       $jobstep[$jobstepidx]->{tempfail} = 1;
1562     }
1563   }
1564 }
1565
1566
1567 # Read whatever is still available on its stderr+stdout pipes after
1568 # the given child process has exited.
1569 sub readfrompipes_after_exit
1570 {
1571   my $jobstepidx = shift;
1572
1573   # The fact that the child has exited allows some convenient
1574   # simplifications: (1) all data must have already been written, so
1575   # there's no need to wait for more once sysread returns 0; (2) the
1576   # total amount of data available is bounded by the pipe buffer size,
1577   # so it's safe to read everything into one string.
1578   my $buf;
1579   while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
1580     $jobstep[$jobstepidx]->{stderr_at} = time;
1581     $jobstep[$jobstepidx]->{stderr} .= $buf;
1582   }
1583   if ($jobstep[$jobstepidx]->{stdout_r}) {
1584     while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
1585       $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1586     }
1587   }
1588   preprocess_stderr ($jobstepidx);
1589
1590   map {
1591     Log ($jobstepidx, "stderr $_");
1592   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1593   $jobstep[$jobstepidx]->{stderr} = '';
1594 }
1595
1596 sub fetch_block
1597 {
1598   my $hash = shift;
1599   my $keep;
1600   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1601     Log(undef, "fetch_block run error from arv-get $hash: $!");
1602     return undef;
1603   }
1604   my $output_block = "";
1605   while (1) {
1606     my $buf;
1607     my $bytes = sysread($keep, $buf, 1024 * 1024);
1608     if (!defined $bytes) {
1609       Log(undef, "fetch_block read error from arv-get: $!");
1610       $output_block = undef;
1611       last;
1612     } elsif ($bytes == 0) {
1613       # sysread returns 0 at the end of the pipe.
1614       last;
1615     } else {
1616       # some bytes were read into buf.
1617       $output_block .= $buf;
1618     }
1619   }
1620   close $keep;
1621   if ($?) {
1622     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1623     $output_block = undef;
1624   }
1625   return $output_block;
1626 }
1627
1628 # Create a collection by concatenating the output of all tasks (each
1629 # task's output is either a manifest fragment, a locator for a
1630 # manifest fragment stored in Keep, or nothing at all). Return the
1631 # portable_data_hash of the new collection.
1632 sub create_output_collection
1633 {
1634   Log (undef, "collate");
1635
1636   my ($child_out, $child_in);
1637   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1638 import arvados
1639 import sys
1640 print (arvados.api("v1").collections().
1641        create(body={"manifest_text": sys.stdin.read(),
1642                     "owner_uuid": sys.argv[2]}).
1643        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1644 }, retry_count(), $Job->{owner_uuid});
1645
1646   my $task_idx = -1;
1647   my $manifest_size = 0;
1648   for (@jobstep)
1649   {
1650     ++$task_idx;
1651     my $output = $_->{'arvados_task'}->{output};
1652     next if (!defined($output));
1653     my $next_write;
1654     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1655       $next_write = fetch_block($output);
1656     } else {
1657       $next_write = $output;
1658     }
1659     if (defined($next_write)) {
1660       if (!defined(syswrite($child_in, $next_write))) {
1661         # There's been an error writing.  Stop the loop.
1662         # We'll log details about the exit code later.
1663         last;
1664       } else {
1665         $manifest_size += length($next_write);
1666       }
1667     } else {
1668       my $uuid = $_->{'arvados_task'}->{'uuid'};
1669       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1670       $main::success = 0;
1671     }
1672   }
1673   close($child_in);
1674   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1675
1676   my $joboutput;
1677   my $s = IO::Select->new($child_out);
1678   if ($s->can_read(120)) {
1679     sysread($child_out, $joboutput, 1024 * 1024);
1680     waitpid($pid, 0);
1681     if ($?) {
1682       Log(undef, "output collection creation exited " . exit_status_s($?));
1683       $joboutput = undef;
1684     } else {
1685       chomp($joboutput);
1686     }
1687   } else {
1688     Log (undef, "timed out while creating output collection");
1689     foreach my $signal (2, 2, 2, 15, 15, 9) {
1690       kill($signal, $pid);
1691       last if waitpid($pid, WNOHANG) == -1;
1692       sleep(1);
1693     }
1694   }
1695   close($child_out);
1696
1697   return $joboutput;
1698 }
1699
1700 # Calls create_output_collection, logs the result, and returns it.
1701 # If that was successful, save that as the output in the job record.
1702 sub save_output_collection {
1703   my $collated_output = create_output_collection();
1704
1705   if (!$collated_output) {
1706     Log(undef, "Failed to write output collection");
1707   }
1708   else {
1709     Log(undef, "job output $collated_output");
1710     $Job->update_attributes('output' => $collated_output);
1711   }
1712   return $collated_output;
1713 }
1714
1715 sub killem
1716 {
1717   foreach (@_)
1718   {
1719     my $sig = 2;                # SIGINT first
1720     if (exists $proc{$_}->{"sent_$sig"} &&
1721         time - $proc{$_}->{"sent_$sig"} > 4)
1722     {
1723       $sig = 15;                # SIGTERM if SIGINT doesn't work
1724     }
1725     if (exists $proc{$_}->{"sent_$sig"} &&
1726         time - $proc{$_}->{"sent_$sig"} > 4)
1727     {
1728       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1729     }
1730     if (!exists $proc{$_}->{"sent_$sig"})
1731     {
1732       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1733       kill $sig, $_;
1734       select (undef, undef, undef, 0.1);
1735       if ($sig == 2)
1736       {
1737         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1738       }
1739       $proc{$_}->{"sent_$sig"} = time;
1740       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1741     }
1742   }
1743 }
1744
1745
1746 sub fhbits
1747 {
1748   my($bits);
1749   for (@_) {
1750     vec($bits,fileno($_),1) = 1;
1751   }
1752   $bits;
1753 }
1754
1755
1756 # Send log output to Keep via arv-put.
1757 #
1758 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1759 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1760 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1761 # $log_pipe_pid is the pid of the arv-put subprocess.
1762 #
1763 # The only functions that should access these variables directly are:
1764 #
1765 # log_writer_start($logfilename)
1766 #     Starts an arv-put pipe, reading data on stdin and writing it to
1767 #     a $logfilename file in an output collection.
1768 #
1769 # log_writer_read_output([$timeout])
1770 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1771 #     Passes $timeout to the select() call, with a default of 0.01.
1772 #     Returns the result of the last read() call on $log_pipe_out, or
1773 #     -1 if read() wasn't called because select() timed out.
1774 #     Only other log_writer_* functions should need to call this.
1775 #
1776 # log_writer_send($txt)
1777 #     Writes $txt to the output log collection.
1778 #
1779 # log_writer_finish()
1780 #     Closes the arv-put pipe and returns the output that it produces.
1781 #
1782 # log_writer_is_active()
1783 #     Returns a true value if there is currently a live arv-put
1784 #     process, false otherwise.
1785 #
1786 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1787     $log_pipe_pid);
1788
1789 sub log_writer_start($)
1790 {
1791   my $logfilename = shift;
1792   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1793                         'arv-put',
1794                         '--stream',
1795                         '--retries', '6',
1796                         '--filename', $logfilename,
1797                         '-');
1798   $log_pipe_out_buf = "";
1799   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1800 }
1801
1802 sub log_writer_read_output {
1803   my $timeout = shift || 0.01;
1804   my $read = -1;
1805   while ($read && $log_pipe_out_select->can_read($timeout)) {
1806     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1807                  length($log_pipe_out_buf));
1808   }
1809   if (!defined($read)) {
1810     Log(undef, "error reading log manifest from arv-put: $!");
1811   }
1812   return $read;
1813 }
1814
1815 sub log_writer_send($)
1816 {
1817   my $txt = shift;
1818   print $log_pipe_in $txt;
1819   log_writer_read_output();
1820 }
1821
1822 sub log_writer_finish()
1823 {
1824   return unless $log_pipe_pid;
1825
1826   close($log_pipe_in);
1827
1828   my $logger_failed = 0;
1829   my $read_result = log_writer_read_output(600);
1830   if ($read_result == -1) {
1831     $logger_failed = -1;
1832     Log (undef, "timed out reading from 'arv-put'");
1833   } elsif ($read_result != 0) {
1834     $logger_failed = -2;
1835     Log(undef, "failed to read arv-put log manifest to EOF");
1836   }
1837
1838   waitpid($log_pipe_pid, 0);
1839   if ($?) {
1840     $logger_failed ||= $?;
1841     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1842   }
1843
1844   close($log_pipe_out);
1845   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1846   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1847       $log_pipe_out_select = undef;
1848
1849   return $arv_put_output;
1850 }
1851
1852 sub log_writer_is_active() {
1853   return $log_pipe_pid;
1854 }
1855
1856 sub Log                         # ($jobstepidx, $logmessage)
1857 {
1858   my ($jobstepidx, $logmessage) = @_;
1859   if ($logmessage =~ /\n/) {
1860     for my $line (split (/\n/, $_[1])) {
1861       Log ($jobstepidx, $line);
1862     }
1863     return;
1864   }
1865   my $fh = select STDERR; $|=1; select $fh;
1866   my $task_qseq = '';
1867   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1868     $task_qseq = $jobstepidx;
1869   }
1870   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1871   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1872   $message .= "\n";
1873   my $datetime;
1874   if (log_writer_is_active() || -t STDERR) {
1875     my @gmtime = gmtime;
1876     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1877                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1878   }
1879   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1880
1881   if (log_writer_is_active()) {
1882     log_writer_send($datetime . " " . $message);
1883   }
1884 }
1885
1886
1887 sub croak
1888 {
1889   my ($package, $file, $line) = caller;
1890   my $message = "@_ at $file line $line\n";
1891   Log (undef, $message);
1892   release_allocation();
1893   freeze() if @jobstep_todo;
1894   create_output_collection() if @jobstep_todo;
1895   cleanup();
1896   save_meta();
1897   die;
1898 }
1899
1900
1901 sub cleanup
1902 {
1903   return unless $Job;
1904   if ($Job->{'state'} eq 'Cancelled') {
1905     $Job->update_attributes('finished_at' => scalar gmtime);
1906   } else {
1907     $Job->update_attributes('state' => 'Failed');
1908   }
1909 }
1910
1911
1912 sub save_meta
1913 {
1914   my $justcheckpoint = shift; # false if this will be the last meta saved
1915   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1916   return unless log_writer_is_active();
1917   my $log_manifest = log_writer_finish();
1918   return unless defined($log_manifest);
1919
1920   if ($Job->{log}) {
1921     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1922     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1923   }
1924
1925   my $log_coll = api_call(
1926     "collections/create", ensure_unique_name => 1, collection => {
1927       manifest_text => $log_manifest,
1928       owner_uuid => $Job->{owner_uuid},
1929       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1930     });
1931   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1932   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1933
1934   return $log_coll->{portable_data_hash};
1935 }
1936
1937
1938 sub freeze_if_want_freeze
1939 {
1940   if ($main::please_freeze)
1941   {
1942     release_allocation();
1943     if (@_)
1944     {
1945       # kill some srun procs before freeze+stop
1946       map { $proc{$_} = {} } @_;
1947       while (%proc)
1948       {
1949         killem (keys %proc);
1950         select (undef, undef, undef, 0.1);
1951         my $died;
1952         while (($died = waitpid (-1, WNOHANG)) > 0)
1953         {
1954           delete $proc{$died};
1955         }
1956       }
1957     }
1958     freeze();
1959     create_output_collection();
1960     cleanup();
1961     save_meta();
1962     exit 1;
1963   }
1964 }
1965
1966
1967 sub freeze
1968 {
1969   Log (undef, "Freeze not implemented");
1970   return;
1971 }
1972
1973
1974 sub thaw
1975 {
1976   croak ("Thaw not implemented");
1977 }
1978
1979
1980 sub freezequote
1981 {
1982   my $s = shift;
1983   $s =~ s/\\/\\\\/g;
1984   $s =~ s/\n/\\n/g;
1985   return $s;
1986 }
1987
1988
1989 sub freezeunquote
1990 {
1991   my $s = shift;
1992   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1993   return $s;
1994 }
1995
1996 sub srun_sync
1997 {
1998   my $srunargs = shift;
1999   my $execargs = shift;
2000   my $opts = shift || {};
2001   my $stdin = shift;
2002
2003   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
2004   Log (undef, "$label: start");
2005
2006   my ($stderr_r, $stderr_w);
2007   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
2008
2009   my ($stdout_r, $stdout_w);
2010   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
2011
2012   my $started_srun = scalar time;
2013
2014   my $srunpid = fork();
2015   if ($srunpid == 0)
2016   {
2017     close($stderr_r);
2018     close($stdout_r);
2019     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
2020     fcntl($stdout_w, F_SETFL, 0) or croak($!);
2021     open(STDERR, ">&", $stderr_w) or croak ($!);
2022     open(STDOUT, ">&", $stdout_w) or croak ($!);
2023     srun ($srunargs, $execargs, $opts, $stdin);
2024     exit (1);
2025   }
2026   close($stderr_w);
2027   close($stdout_w);
2028
2029   set_nonblocking($stderr_r);
2030   set_nonblocking($stdout_r);
2031
2032   # Add entries to @jobstep and %proc so check_squeue() and
2033   # freeze_if_want_freeze() can treat it like a job task process.
2034   push @jobstep, {
2035     stderr => '',
2036     stderr_at => 0,
2037     stderr_captured => '',
2038     stdout_r => $stdout_r,
2039     stdout_captured => '',
2040   };
2041   my $jobstepidx = $#jobstep;
2042   $proc{$srunpid} = {
2043     jobstepidx => $jobstepidx,
2044   };
2045   $reader{$jobstepidx} = $stderr_r;
2046
2047   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2048     my $busy = readfrompipes();
2049     if (!$busy || ($latest_refresh + 2 < scalar time)) {
2050       check_refresh_wanted();
2051       check_squeue();
2052       check_sinfo();
2053     }
2054     if (!$busy) {
2055       select(undef, undef, undef, 0.1);
2056     }
2057     if (($started_srun + $srun_sync_timeout) < scalar time) {
2058       # Exceeded general timeout for "srun_sync" operations, likely
2059       # means something got stuck on the remote node.
2060       Log(undef, "srun_sync exceeded timeout, will fail.");
2061       $main::please_freeze = 1;
2062     }
2063     killem(keys %proc) if $main::please_freeze;
2064   }
2065   my $exited = $?;
2066
2067   readfrompipes_after_exit ($jobstepidx);
2068
2069   Log (undef, "$label: exit ".exit_status_s($exited));
2070
2071   close($stdout_r);
2072   close($stderr_r);
2073   delete $proc{$srunpid};
2074   delete $reader{$jobstepidx};
2075
2076   my $j = pop @jobstep;
2077   # If the srun showed signs of tempfail, ensure the caller treats that as a
2078   # failure case.
2079   if ($main::please_freeze || $j->{tempfail}) {
2080     $exited ||= 255;
2081   }
2082   return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
2083 }
2084
2085
2086 sub srun
2087 {
2088   my $srunargs = shift;
2089   my $execargs = shift;
2090   my $opts = shift || {};
2091   my $stdin = shift;
2092   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2093
2094   $Data::Dumper::Terse = 1;
2095   $Data::Dumper::Indent = 0;
2096   my $show_cmd = Dumper($args);
2097   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2098   $show_cmd =~ s/\n/ /g;
2099   if ($opts->{fork}) {
2100     Log(undef, "starting: $show_cmd");
2101   } else {
2102     # This is a child process: parent is in charge of reading our
2103     # stderr and copying it to Log() if needed.
2104     warn "starting: $show_cmd\n";
2105   }
2106
2107   if (defined $stdin) {
2108     my $child = open STDIN, "-|";
2109     defined $child or die "no fork: $!";
2110     if ($child == 0) {
2111       print $stdin or die $!;
2112       close STDOUT or die $!;
2113       exit 0;
2114     }
2115   }
2116
2117   return system (@$args) if $opts->{fork};
2118
2119   exec @$args;
2120   warn "ENV size is ".length(join(" ",%ENV));
2121   die "exec failed: $!: @$args";
2122 }
2123
2124
2125 sub ban_node_by_slot {
2126   # Don't start any new jobsteps on this node for 60 seconds
2127   my $slotid = shift;
2128   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2129   $slot[$slotid]->{node}->{hold_count}++;
2130   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2131 }
2132
2133 sub must_lock_now
2134 {
2135   my ($lockfile, $error_message) = @_;
2136   open L, ">", $lockfile or croak("$lockfile: $!");
2137   if (!flock L, LOCK_EX|LOCK_NB) {
2138     croak("Can't lock $lockfile: $error_message\n");
2139   }
2140 }
2141
2142 sub find_docker_image {
2143   # Given a Keep locator, check to see if it contains a Docker image.
2144   # If so, return its stream name and Docker hash.
2145   # If not, return undef for both values.
2146   my $locator = shift;
2147   my ($streamname, $filename);
2148   my $image = api_call("collections/get", uuid => $locator);
2149   if ($image) {
2150     foreach my $line (split(/\n/, $image->{manifest_text})) {
2151       my @tokens = split(/\s+/, $line);
2152       next if (!@tokens);
2153       $streamname = shift(@tokens);
2154       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2155         if (defined($filename)) {
2156           return (undef, undef);  # More than one file in the Collection.
2157         } else {
2158           $filename = (split(/:/, $filedata, 3))[2];
2159         }
2160       }
2161     }
2162   }
2163   if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2164     return ($streamname, $1);
2165   } else {
2166     return (undef, undef);
2167   }
2168 }
2169
2170 sub exit_retry_unlocked {
2171   Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
2172   exit(EX_RETRY_UNLOCKED);
2173 }
2174
2175 sub retry_count {
2176   # Calculate the number of times an operation should be retried,
2177   # assuming exponential backoff, and that we're willing to retry as
2178   # long as tasks have been running.  Enforce a minimum of 3 retries.
2179   my ($starttime, $endtime, $timediff, $retries);
2180   if (@jobstep) {
2181     $starttime = $jobstep[0]->{starttime};
2182     $endtime = $jobstep[-1]->{finishtime};
2183   }
2184   if (!defined($starttime)) {
2185     $timediff = 0;
2186   } elsif (!defined($endtime)) {
2187     $timediff = time - $starttime;
2188   } else {
2189     $timediff = ($endtime - $starttime) - (time - $endtime);
2190   }
2191   if ($timediff > 0) {
2192     $retries = int(log($timediff) / log(2));
2193   } else {
2194     $retries = 1;  # Use the minimum.
2195   }
2196   return ($retries > 3) ? $retries : 3;
2197 }
2198
2199 sub retry_op {
2200   # Pass in two function references.
2201   # This method will be called with the remaining arguments.
2202   # If it dies, retry it with exponential backoff until it succeeds,
2203   # or until the current retry_count is exhausted.  After each failure
2204   # that can be retried, the second function will be called with
2205   # the current try count (0-based), next try time, and error message.
2206   my $operation = shift;
2207   my $op_text = shift;
2208   my $retries = retry_count();
2209   my $retry_callback = sub {
2210     my ($try_count, $next_try_at, $errmsg) = @_;
2211     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2212     $errmsg =~ s/\s/ /g;
2213     $errmsg =~ s/\s+$//;
2214     my $retry_msg;
2215     if ($next_try_at < time) {
2216       $retry_msg = "Retrying.";
2217     } else {
2218       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2219       $retry_msg = "Retrying at $next_try_fmt.";
2220     }
2221     Log(undef, "$op_text failed: $errmsg. $retry_msg");
2222   };
2223   foreach my $try_count (0..$retries) {
2224     my $next_try = time + (2 ** $try_count);
2225     my $result = eval { $operation->(@_); };
2226     if (!$@) {
2227       return $result;
2228     } elsif ($try_count < $retries) {
2229       $retry_callback->($try_count, $next_try, $@);
2230       my $sleep_time = $next_try - time;
2231       sleep($sleep_time) if ($sleep_time > 0);
2232     }
2233   }
2234   # Ensure the error message ends in a newline, so Perl doesn't add
2235   # retry_op's line number to it.
2236   chomp($@);
2237   die($@ . "\n");
2238 }
2239
2240 sub api_call {
2241   # Pass in a /-separated API method name, and arguments for it.
2242   # This function will call that method, retrying as needed until
2243   # the current retry_count is exhausted, with a log on the first failure.
2244   my $method_name = shift;
2245   my $method = $arv;
2246   foreach my $key (split(/\//, $method_name)) {
2247     $method = $method->{$key};
2248   }
2249   return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
2250 }
2251
2252 sub exit_status_s {
2253   # Given a $?, return a human-readable exit code string like "0" or
2254   # "1" or "0 with signal 1" or "1 with signal 11".
2255   my $exitcode = shift;
2256   my $s = $exitcode >> 8;
2257   if ($exitcode & 0x7f) {
2258     $s .= " with signal " . ($exitcode & 0x7f);
2259   }
2260   if ($exitcode & 0x80) {
2261     $s .= " with core dump";
2262   }
2263   return $s;
2264 }
2265
2266 sub handle_readall {
2267   # Pass in a glob reference to a file handle.
2268   # Read all its contents and return them as a string.
2269   my $fh_glob_ref = shift;
2270   local $/ = undef;
2271   return <$fh_glob_ref>;
2272 }
2273
2274 sub tar_filename_n {
2275   my $n = shift;
2276   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2277 }
2278
2279 sub add_git_archive {
2280   # Pass in a git archive command as a string or list, a la system().
2281   # This method will save its output to be included in the archive sent to the
2282   # build script.
2283   my $git_input;
2284   $git_tar_count++;
2285   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2286     croak("Failed to save git archive: $!");
2287   }
2288   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2289   close($git_input);
2290   waitpid($git_pid, 0);
2291   close(GIT_ARCHIVE);
2292   if ($?) {
2293     croak("Failed to save git archive: git exited " . exit_status_s($?));
2294   }
2295 }
2296
2297 sub combined_git_archive {
2298   # Combine all saved tar archives into a single archive, then return its
2299   # contents in a string.  Return undef if no archives have been saved.
2300   if ($git_tar_count < 1) {
2301     return undef;
2302   }
2303   my $base_tar_name = tar_filename_n(1);
2304   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2305     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2306     if ($tar_exit != 0) {
2307       croak("Error preparing build archive: tar -A exited " .
2308             exit_status_s($tar_exit));
2309     }
2310   }
2311   if (!open(GIT_TAR, "<", $base_tar_name)) {
2312     croak("Could not open build archive: $!");
2313   }
2314   my $tar_contents = handle_readall(\*GIT_TAR);
2315   close(GIT_TAR);
2316   return $tar_contents;
2317 }
2318
2319 sub set_nonblocking {
2320   my $fh = shift;
2321   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2322   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2323 }
2324
2325 __DATA__
2326 #!/usr/bin/env perl
2327 #
2328 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2329 # server invokes this script on individual compute nodes, or localhost if we're
2330 # running a job locally.  It gets called in two modes:
2331 #
2332 # * No arguments: Installation mode.  Read a tar archive from the DATA
2333 #   file handle; it includes the Crunch script's source code, and
2334 #   maybe SDKs as well.  Those should be installed in the proper
2335 #   locations.  This runs outside of any Docker container, so don't try to
2336 #   introspect Crunch's runtime environment.
2337 #
2338 # * With arguments: Crunch script run mode.  This script should set up the
2339 #   environment, then run the command specified in the arguments.  This runs
2340 #   inside any Docker container.
2341
2342 use Fcntl ':flock';
2343 use File::Path qw( make_path remove_tree );
2344 use POSIX qw(getcwd);
2345
2346 use constant TASK_TEMPFAIL => 111;
2347
2348 # Map SDK subdirectories to the path environments they belong to.
2349 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2350
2351 my $destdir = $ENV{"CRUNCH_SRC"};
2352 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2353 my $repo = $ENV{"CRUNCH_SRC_URL"};
2354 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2355 my $job_work = $ENV{"JOB_WORK"};
2356 my $task_work = $ENV{"TASK_WORK"};
2357
2358 open(STDOUT_ORIG, ">&", STDOUT);
2359 open(STDERR_ORIG, ">&", STDERR);
2360
2361 for my $dir ($destdir, $job_work, $task_work) {
2362   if ($dir) {
2363     make_path $dir;
2364     -e $dir or die "Failed to create temporary directory ($dir): $!";
2365   }
2366 }
2367
2368 if ($task_work) {
2369   remove_tree($task_work, {keep_root => 1});
2370 }
2371
2372 ### Crunch script run mode
2373 if (@ARGV) {
2374   # We want to do routine logging during task 0 only.  This gives the user
2375   # the information they need, but avoids repeating the information for every
2376   # task.
2377   my $Log;
2378   if ($ENV{TASK_SEQUENCE} eq "0") {
2379     $Log = sub {
2380       my $msg = shift;
2381       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2382     };
2383   } else {
2384     $Log = sub { };
2385   }
2386
2387   my $python_src = "$install_dir/python";
2388   my $venv_dir = "$job_work/.arvados.venv";
2389   my $venv_built = -e "$venv_dir/bin/activate";
2390   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2391     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2392                  "--python=python2.7", $venv_dir);
2393     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2394     $venv_built = 1;
2395     $Log->("Built Python SDK virtualenv");
2396   }
2397
2398   my @pysdk_version_cmd = ("python", "-c",
2399     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2400   if ($venv_built) {
2401     $Log->("Running in Python SDK virtualenv");
2402     @pysdk_version_cmd = ();
2403     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2404     @ARGV = ("/bin/sh", "-ec",
2405              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2406   } elsif (-d $python_src) {
2407     $Log->("Warning: virtualenv not found inside Docker container default " .
2408            "\$PATH. Can't install Python SDK.");
2409   }
2410
2411   if (@pysdk_version_cmd) {
2412     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2413     my $pysdk_version = <$pysdk_version_pipe>;
2414     close($pysdk_version_pipe);
2415     if ($? == 0) {
2416       chomp($pysdk_version);
2417       $Log->("Using Arvados SDK version $pysdk_version");
2418     } else {
2419       # A lot could've gone wrong here, but pretty much all of it means that
2420       # Python won't be able to load the Arvados SDK.
2421       $Log->("Warning: Arvados SDK not found");
2422     }
2423   }
2424
2425   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2426     my $sdk_path = "$install_dir/$sdk_dir";
2427     if (-d $sdk_path) {
2428       if ($ENV{$sdk_envkey}) {
2429         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2430       } else {
2431         $ENV{$sdk_envkey} = $sdk_path;
2432       }
2433       $Log->("Arvados SDK added to %s", $sdk_envkey);
2434     }
2435   }
2436
2437   exec(@ARGV);
2438   die "Cannot exec `@ARGV`: $!";
2439 }
2440
2441 ### Installation mode
2442 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2443 flock L, LOCK_EX;
2444 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2445   # This exact git archive (source + arvados sdk) is already installed
2446   # here, so there's no need to reinstall it.
2447
2448   # We must consume our DATA section, though: otherwise the process
2449   # feeding it to us will get SIGPIPE.
2450   my $buf;
2451   while (read(DATA, $buf, 65536)) { }
2452
2453   exit(0);
2454 }
2455
2456 unlink "$destdir.archive_hash";
2457 mkdir $destdir;
2458
2459 do {
2460   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2461   local $SIG{PIPE} = "IGNORE";
2462   warn "Extracting archive: $archive_hash\n";
2463   # --ignore-zeros is necessary sometimes: depending on how much NUL
2464   # padding tar -A put on our combined archive (which in turn depends
2465   # on the length of the component archives) tar without
2466   # --ignore-zeros will exit before consuming stdin and cause close()
2467   # to fail on the resulting SIGPIPE.
2468   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2469     die "Error launching 'tar -xC $destdir': $!";
2470   }
2471   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2472   # get SIGPIPE.  We must feed it data incrementally.
2473   my $tar_input;
2474   while (read(DATA, $tar_input, 65536)) {
2475     print TARX $tar_input;
2476   }
2477   if(!close(TARX)) {
2478     die "'tar -xC $destdir' exited $?: $!";
2479   }
2480 };
2481
2482 mkdir $install_dir;
2483
2484 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2485 if (-d $sdk_root) {
2486   foreach my $sdk_lang (("python",
2487                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2488     if (-d "$sdk_root/$sdk_lang") {
2489       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2490         die "Failed to install $sdk_lang SDK: $!";
2491       }
2492     }
2493   }
2494 }
2495
2496 my $python_dir = "$install_dir/python";
2497 if ((-d $python_dir) and can_run("python2.7")) {
2498   open(my $egg_info_pipe, "-|",
2499        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2500   my @egg_info_errors = <$egg_info_pipe>;
2501   close($egg_info_pipe);
2502
2503   if ($?) {
2504     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2505       # egg_info apparently failed because it couldn't ask git for a build tag.
2506       # Specify no build tag.
2507       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2508       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2509       close($pysdk_cfg);
2510     } else {
2511       my $egg_info_exit = $? >> 8;
2512       foreach my $errline (@egg_info_errors) {
2513         warn $errline;
2514       }
2515       warn "python setup.py egg_info failed: exit $egg_info_exit";
2516       exit ($egg_info_exit || 1);
2517     }
2518   }
2519 }
2520
2521 # Hide messages from the install script (unless it fails: shell_or_die
2522 # will show $destdir.log in that case).
2523 open(STDOUT, ">>", "$destdir.log") or die ($!);
2524 open(STDERR, ">&", STDOUT) or die ($!);
2525
2526 if (-e "$destdir/crunch_scripts/install") {
2527     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2528 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2529     # Old version
2530     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2531 } elsif (-e "./install.sh") {
2532     shell_or_die (undef, "./install.sh", $install_dir);
2533 }
2534
2535 if ($archive_hash) {
2536     unlink "$destdir.archive_hash.new";
2537     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2538     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2539 }
2540
2541 close L;
2542
2543 sub can_run {
2544   my $command_name = shift;
2545   open(my $which, "-|", "which", $command_name) or die ($!);
2546   while (<$which>) { }
2547   close($which);
2548   return ($? == 0);
2549 }
2550
2551 sub shell_or_die
2552 {
2553   my $exitcode = shift;
2554
2555   if ($ENV{"DEBUG"}) {
2556     print STDERR "@_\n";
2557   }
2558   if (system (@_) != 0) {
2559     my $err = $!;
2560     my $code = $?;
2561     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2562     open STDERR, ">&STDERR_ORIG";
2563     system ("cat $destdir.log >&2");
2564     warn "@_ failed ($err): $exitstatus";
2565     if (defined($exitcode)) {
2566       exit $exitcode;
2567     }
2568     else {
2569       exit (($code >> 8) || 1);
2570     }
2571   }
2572 }
2573
2574 __DATA__