15516: document the use of variables with the nameref argument set.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: AGPL-3.0
6
7 =head1 NAME
8
9 crunch-job: Execute job steps, save snapshots as requested, collate output.
10
11 =head1 SYNOPSIS
12
13 Obtain job details from Arvados, run tasks on compute nodes (typically
14 invoked by scheduler on controller):
15
16  crunch-job --job x-y-z --git-dir /path/to/repo/.git
17
18 Obtain job details from command line, run tasks on local machine
19 (typically invoked by application or developer on VM):
20
21  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
22
23  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
24
25 =head1 OPTIONS
26
27 =over
28
29 =item --force-unlock
30
31 If the job is already locked, steal the lock and run it anyway.
32
33 =item --git-dir
34
35 Path to a .git directory (or a git URL) where the commit given in the
36 job's C<script_version> attribute is to be found. If this is I<not>
37 given, the job's C<repository> attribute will be used.
38
39 =item --job-api-token
40
41 Arvados API authorization token to use during the course of the job.
42
43 =item --no-clear-tmp
44
45 Do not clear per-job/task temporary directories during initial job
46 setup. This can speed up development and debugging when running jobs
47 locally.
48
49 =item --job
50
51 UUID of the job to run, or a JSON-encoded job resource without a
52 UUID. If the latter is given, a new job object will be created.
53
54 =back
55
56 =head1 RUNNING JOBS LOCALLY
57
58 crunch-job's log messages appear on stderr along with the job tasks'
59 stderr streams. The log is saved in Keep at each checkpoint and when
60 the job finishes.
61
62 If the job succeeds, the job's output locator is printed on stdout.
63
64 While the job is running, the following signals are accepted:
65
66 =over
67
68 =item control-C, SIGINT, SIGQUIT
69
70 Save a checkpoint, terminate any job tasks that are running, and stop.
71
72 =item SIGALRM
73
74 Save a checkpoint and continue.
75
76 =item SIGHUP
77
78 Refresh node allocation (i.e., check whether any nodes have been added
79 or unallocated) and attributes of the Job record that should affect
80 behavior (e.g., cancel job if cancelled_at becomes non-nil).
81
82 =back
83
84 =cut
85
86
87 use strict;
88 use POSIX ':sys_wait_h';
89 use POSIX qw(strftime);
90 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Arvados;
92 use Cwd qw(realpath);
93 use Data::Dumper;
94 use Digest::MD5 qw(md5_hex);
95 use Getopt::Long;
96 use IPC::Open2;
97 use IO::Select;
98 use File::Temp;
99 use Fcntl ':flock';
100 use File::Path qw( make_path remove_tree );
101
102 use constant TASK_TEMPFAIL => 111;
103 use constant EX_TEMPFAIL => 75;
104 use constant EX_RETRY_UNLOCKED => 93;
105
106 $ENV{"TMPDIR"} ||= "/tmp";
107 unless (defined $ENV{"CRUNCH_TMP"}) {
108   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
109   if ($ENV{"USER"} ne "crunch" && $< != 0) {
110     # use a tmp dir unique for my uid
111     $ENV{"CRUNCH_TMP"} .= "-$<";
112   }
113 }
114
115 # Create the tmp directory if it does not exist
116 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
117   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
118 }
119
120 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
121 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
122 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
123 mkdir ($ENV{"JOB_WORK"});
124
125 my %proc;
126 my $force_unlock;
127 my $git_dir;
128 my $jobspec;
129 my $job_api_token;
130 my $no_clear_tmp;
131 my $resume_stash;
132 my $cgroup_root = "/sys/fs/cgroup";
133 my $docker_bin = "docker.io";
134 my $docker_run_args = "";
135 my $srun_sync_timeout = 15*60;
136 GetOptions('force-unlock' => \$force_unlock,
137            'git-dir=s' => \$git_dir,
138            'job=s' => \$jobspec,
139            'job-api-token=s' => \$job_api_token,
140            'no-clear-tmp' => \$no_clear_tmp,
141            'resume-stash=s' => \$resume_stash,
142            'cgroup-root=s' => \$cgroup_root,
143            'docker-bin=s' => \$docker_bin,
144            'docker-run-args=s' => \$docker_run_args,
145            'srun-sync-timeout=i' => \$srun_sync_timeout,
146     );
147
148 if (defined $job_api_token) {
149   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
150 }
151
152 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
153
154
155 $SIG{'USR1'} = sub
156 {
157   $main::ENV{CRUNCH_DEBUG} = 1;
158 };
159 $SIG{'USR2'} = sub
160 {
161   $main::ENV{CRUNCH_DEBUG} = 0;
162 };
163
164 my $arv = Arvados->new('apiVersion' => 'v1');
165
166 my $Job;
167 my $job_id;
168 my $dbh;
169 my $sth;
170 my @jobstep;
171
172 my $local_job;
173 if ($jobspec =~ /^[-a-z\d]+$/)
174 {
175   # $jobspec is an Arvados UUID, not a JSON job specification
176   $Job = api_call("jobs/get", uuid => $jobspec);
177   $local_job = 0;
178 }
179 else
180 {
181   $local_job = JSON::decode_json($jobspec);
182 }
183
184
185 # Make sure our workers (our slurm nodes, localhost, or whatever) are
186 # at least able to run basic commands: they aren't down or severely
187 # misconfigured.
188 my $cmd = ['true'];
189 if (($Job || $local_job)->{docker_image_locator}) {
190   $cmd = [$docker_bin, 'ps', '-q'];
191 }
192 Log(undef, "Sanity check is `@$cmd`");
193 my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
194   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
195   $cmd,
196   {label => "sanity check"});
197 if ($exited != 0) {
198   Log(undef, "Sanity check failed: ".exit_status_s($exited));
199   exit EX_TEMPFAIL;
200 }
201 Log(undef, "Sanity check OK");
202
203
204 my $User = api_call("users/current");
205
206 if (!$local_job) {
207   if (!$force_unlock) {
208     # Claim this job, and make sure nobody else does
209     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
210     if ($@) {
211       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
212       exit EX_TEMPFAIL;
213     };
214   }
215 }
216 else
217 {
218   if (!$resume_stash)
219   {
220     map { croak ("No $_ specified") unless $local_job->{$_} }
221     qw(script script_version script_parameters);
222   }
223
224   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
225   $local_job->{'started_at'} = gmtime;
226   $local_job->{'state'} = 'Running';
227
228   $Job = api_call("jobs/create", job => $local_job);
229 }
230 $job_id = $Job->{'uuid'};
231
232 my $keep_logfile = $job_id . '.log.txt';
233 log_writer_start($keep_logfile);
234
235 $Job->{'runtime_constraints'} ||= {};
236 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
237 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
238
239 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
240 if ($? == 0) {
241   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
242   chomp($gem_versions);
243   chop($gem_versions);  # Closing parentheses
244 } else {
245   $gem_versions = "";
246 }
247 Log(undef,
248     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
249
250 Log (undef, "check slurm allocation");
251 my @slot;
252 my @node;
253 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
254 my @sinfo;
255 if (!$have_slurm)
256 {
257   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
258   push @sinfo, "$localcpus localhost";
259 }
260 if (exists $ENV{SLURM_NODELIST})
261 {
262   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
263 }
264 foreach (@sinfo)
265 {
266   my ($ncpus, $slurm_nodelist) = split;
267   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
268
269   my @nodelist;
270   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
271   {
272     my $nodelist = $1;
273     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
274     {
275       my $ranges = $1;
276       foreach (split (",", $ranges))
277       {
278         my ($a, $b);
279         if (/(\d+)-(\d+)/)
280         {
281           $a = $1;
282           $b = $2;
283         }
284         else
285         {
286           $a = $_;
287           $b = $_;
288         }
289         push @nodelist, map {
290           my $n = $nodelist;
291           $n =~ s/\[[-,\d]+\]/$_/;
292           $n;
293         } ($a..$b);
294       }
295     }
296     else
297     {
298       push @nodelist, $nodelist;
299     }
300   }
301   foreach my $nodename (@nodelist)
302   {
303     Log (undef, "node $nodename - $ncpus slots");
304     my $node = { name => $nodename,
305                  ncpus => $ncpus,
306                  # The number of consecutive times a task has been dispatched
307                  # to this node and failed.
308                  losing_streak => 0,
309                  # The number of consecutive times that SLURM has reported
310                  # a node failure since the last successful task.
311                  fail_count => 0,
312                  # Don't dispatch work to this node until this time
313                  # (in seconds since the epoch) has passed.
314                  hold_until => 0 };
315     foreach my $cpu (1..$ncpus)
316     {
317       push @slot, { node => $node,
318                     cpu => $cpu };
319     }
320   }
321   push @node, @nodelist;
322 }
323
324
325
326 # Ensure that we get one jobstep running on each allocated node before
327 # we start overloading nodes with concurrent steps
328
329 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
330
331
332 $Job->update_attributes(
333   'tasks_summary' => { 'failed' => 0,
334                        'todo' => 1,
335                        'running' => 0,
336                        'done' => 0 });
337
338 Log (undef, "start");
339 $SIG{'INT'} = sub { $main::please_freeze = 1; };
340 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
341 $SIG{'TERM'} = \&croak;
342 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
343 $SIG{'ALRM'} = sub { $main::please_info = 1; };
344 $SIG{'CONT'} = sub { $main::please_continue = 1; };
345 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
346
347 $main::please_freeze = 0;
348 $main::please_info = 0;
349 $main::please_continue = 0;
350 $main::please_refresh = 0;
351 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
352
353 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
354 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
355 $ENV{"JOB_UUID"} = $job_id;
356
357
358 my @jobstep_todo = ();
359 my @jobstep_done = ();
360 my @jobstep_tomerge = ();
361 my $jobstep_tomerge_level = 0;
362 my $squeue_checked = 0;
363 my $sinfo_checked = 0;
364 my $latest_refresh = scalar time;
365
366
367
368 if (defined $Job->{thawedfromkey})
369 {
370   thaw ($Job->{thawedfromkey});
371 }
372 else
373 {
374   my $first_task = api_call("job_tasks/create", job_task => {
375     'job_uuid' => $Job->{'uuid'},
376     'sequence' => 0,
377     'qsequence' => 0,
378     'parameters' => {},
379   });
380   push @jobstep, { 'level' => 0,
381                    'failures' => 0,
382                    'arvados_task' => $first_task,
383                  };
384   push @jobstep_todo, 0;
385 }
386
387
388 if (!$have_slurm)
389 {
390   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
391 }
392
393 my $build_script = handle_readall(\*DATA);
394 my $nodelist = join(",", @node);
395 my $git_tar_count = 0;
396
397 if (!defined $no_clear_tmp) {
398   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
399   # up work directories crunch_tmp/work, crunch_tmp/opt,
400   # crunch_tmp/src*.
401   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
402     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
403     ['bash', '-ec', q{
404 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
405 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
406     }],
407     {label => "clean work dirs"});
408   if ($exited != 0) {
409     exit_retry_unlocked();
410   }
411 }
412
413 # If this job requires a Docker image, install that.
414 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
415 if ($docker_locator = $Job->{docker_image_locator}) {
416   Log (undef, "Install docker image $docker_locator");
417   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
418   if (!$docker_hash)
419   {
420     croak("No Docker image hash found from locator $docker_locator");
421   }
422   Log (undef, "docker image hash is $docker_hash");
423   $docker_stream =~ s/^\.//;
424   my $docker_install_script = qq{
425 loaded() {
426   id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
427   echo "image ID is \$id"
428   [[ \${id} = \Q$docker_hash\E ]]
429 }
430 if loaded >&2 2>/dev/null; then
431   echo >&2 "image is already present"
432   exit 0
433 fi
434 echo >&2 "docker image is not present; loading"
435 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
436 if ! loaded >&2; then
437   echo >&2 "`docker load` exited 0, but image is not found (!)"
438   exit 1
439 fi
440 echo >&2 "image loaded successfully"
441 };
442
443   my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
444     ["srun", "--nodelist=" . join(',', @node)],
445     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
446     {label => "load docker image"});
447   if ($exited != 0)
448   {
449     exit_retry_unlocked();
450   }
451
452   # Determine whether this version of Docker supports memory+swap limits.
453   ($exited, $stdout, $stderr, $tempfail) = srun_sync(
454     ["srun", "--nodes=1"],
455     [$docker_bin, 'run', '--help'],
456     {label => "check --memory-swap feature"});
457   if ($tempfail) {
458     exit_retry_unlocked();
459   }
460   $docker_limitmem = ($stdout =~ /--memory-swap/);
461
462   # Find a non-root Docker user to use.
463   # Tries the default user for the container, then 'crunch', then 'nobody',
464   # testing for whether the actual user id is non-zero.  This defends against
465   # mistakes but not malice, but we intend to harden the security in the future
466   # so we don't want anyone getting used to their jobs running as root in their
467   # Docker containers.
468   my @tryusers = ("", "crunch", "nobody");
469   foreach my $try_user (@tryusers) {
470     my $label;
471     my $try_user_arg;
472     if ($try_user eq "") {
473       $label = "check whether default user is UID 0";
474       $try_user_arg = "";
475     } else {
476       $label = "check whether user '$try_user' is UID 0";
477       $try_user_arg = "--user=$try_user";
478     }
479     my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
480       ["srun", "--nodes=1"],
481       ["/bin/sh", "-ec",
482        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
483       {label => $label});
484     chomp($stdout);
485     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
486       $dockeruserarg = $try_user_arg;
487       if ($try_user eq "") {
488         Log(undef, "Container will run with default user");
489       } else {
490         Log(undef, "Container will run with $dockeruserarg");
491       }
492       last;
493     } elsif ($tempfail) {
494       exit_retry_unlocked();
495     }
496   }
497
498   if (!defined $dockeruserarg) {
499     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
500   }
501
502   if ($Job->{arvados_sdk_version}) {
503     # The job also specifies an Arvados SDK version.  Add the SDKs to the
504     # tar file for the build script to install.
505     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
506                        $Job->{arvados_sdk_version}));
507     add_git_archive("git", "--git-dir=$git_dir", "archive",
508                     "--prefix=.arvados.sdk/",
509                     $Job->{arvados_sdk_version}, "sdk");
510   }
511 }
512
513 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
514   # If script_version looks like an absolute path, *and* the --git-dir
515   # argument was not given -- which implies we were not invoked by
516   # crunch-dispatch -- we will use the given path as a working
517   # directory instead of resolving script_version to a git commit (or
518   # doing anything else with git).
519   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
520   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
521 }
522 else {
523   # Resolve the given script_version to a git commit sha1. Also, if
524   # the repository is remote, clone it into our local filesystem: this
525   # ensures "git archive" will work, and is necessary to reliably
526   # resolve a symbolic script_version like "master^".
527   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
528
529   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
530
531   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
532
533   # If we're running under crunch-dispatch, it will have already
534   # pulled the appropriate source tree into its own repository, and
535   # given us that repo's path as $git_dir.
536   #
537   # If we're running a "local" job, we might have to fetch content
538   # from a remote repository.
539   #
540   # (Currently crunch-dispatch gives a local path with --git-dir, but
541   # we might as well accept URLs there too in case it changes its
542   # mind.)
543   my $repo = $git_dir || $Job->{'repository'};
544
545   # Repository can be remote or local. If remote, we'll need to fetch it
546   # to a local dir before doing `git log` et al.
547   my $repo_location;
548
549   if ($repo =~ m{://|^[^/]*:}) {
550     # $repo is a git url we can clone, like git:// or https:// or
551     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
552     # not recognized here because distinguishing that from a local
553     # path is too fragile. If you really need something strange here,
554     # use the ssh:// form.
555     $repo_location = 'remote';
556   } elsif ($repo =~ m{^\.*/}) {
557     # $repo is a local path to a git index. We'll also resolve ../foo
558     # to ../foo/.git if the latter is a directory. To help
559     # disambiguate local paths from named hosted repositories, this
560     # form must be given as ./ or ../ if it's a relative path.
561     if (-d "$repo/.git") {
562       $repo = "$repo/.git";
563     }
564     $repo_location = 'local';
565   } else {
566     # $repo is none of the above. It must be the name of a hosted
567     # repository.
568     my $arv_repo_list = api_call("repositories/list",
569                                  'filters' => [['name','=',$repo]]);
570     my @repos_found = @{$arv_repo_list->{'items'}};
571     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
572     if ($n_found > 0) {
573       Log(undef, "Repository '$repo' -> "
574           . join(", ", map { $_->{'uuid'} } @repos_found));
575     }
576     if ($n_found != 1) {
577       croak("Error: Found $n_found repositories with name '$repo'.");
578     }
579     $repo = $repos_found[0]->{'fetch_url'};
580     $repo_location = 'remote';
581   }
582   Log(undef, "Using $repo_location repository '$repo'");
583   $ENV{"CRUNCH_SRC_URL"} = $repo;
584
585   # Resolve given script_version (we'll call that $treeish here) to a
586   # commit sha1 ($commit).
587   my $treeish = $Job->{'script_version'};
588   my $commit;
589   if ($repo_location eq 'remote') {
590     # We minimize excess object-fetching by re-using the same bare
591     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
592     # just keep adding remotes to it as needed.
593     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
594     my $gitcmd = "git --git-dir=\Q$local_repo\E";
595
596     # Set up our local repo for caching remote objects, making
597     # archives, etc.
598     if (!-d $local_repo) {
599       make_path($local_repo) or croak("Error: could not create $local_repo");
600     }
601     # This works (exits 0 and doesn't delete fetched objects) even
602     # if $local_repo is already initialized:
603     `$gitcmd init --bare`;
604     if ($?) {
605       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
606     }
607
608     # If $treeish looks like a hash (or abbrev hash) we look it up in
609     # our local cache first, since that's cheaper. (We don't want to
610     # do that with tags/branches though -- those change over time, so
611     # they should always be resolved by the remote repo.)
612     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
613       # Hide stderr because it's normal for this to fail:
614       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
615       if ($? == 0 &&
616           # Careful not to resolve a branch named abcdeff to commit 1234567:
617           $sha1 =~ /^$treeish/ &&
618           $sha1 =~ /^([0-9a-f]{40})$/s) {
619         $commit = $1;
620         Log(undef, "Commit $commit already present in $local_repo");
621       }
622     }
623
624     if (!defined $commit) {
625       # If $treeish isn't just a hash or abbrev hash, or isn't here
626       # yet, we need to fetch the remote to resolve it correctly.
627
628       # First, remove all local heads. This prevents a name that does
629       # not exist on the remote from resolving to (or colliding with)
630       # a previously fetched branch or tag (possibly from a different
631       # remote).
632       remove_tree("$local_repo/refs/heads", {keep_root => 1});
633
634       Log(undef, "Fetching objects from $repo to $local_repo");
635       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
636       if ($?) {
637         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
638       }
639     }
640
641     # Now that the data is all here, we will use our local repo for
642     # the rest of our git activities.
643     $repo = $local_repo;
644   }
645
646   my $gitcmd = "git --git-dir=\Q$repo\E";
647   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
648   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
649     croak("`$gitcmd rev-list` exited "
650           .exit_status_s($?)
651           .", '$treeish' not found, giving up");
652   }
653   $commit = $1;
654   Log(undef, "Version $treeish is commit $commit");
655
656   if ($commit ne $Job->{'script_version'}) {
657     # Record the real commit id in the database, frozentokey, logs,
658     # etc. -- instead of an abbreviation or a branch name which can
659     # become ambiguous or point to a different commit in the future.
660     if (!$Job->update_attributes('script_version' => $commit)) {
661       croak("Error: failed to update job's script_version attribute");
662     }
663   }
664
665   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
666   add_git_archive("$gitcmd archive ''\Q$commit\E");
667 }
668
669 my $git_archive = combined_git_archive();
670 if (!defined $git_archive) {
671   Log(undef, "Skip install phase (no git archive)");
672   if ($have_slurm) {
673     Log(undef, "Warning: This probably means workers have no source tree!");
674   }
675 }
676 else {
677   my $exited;
678   my $install_script_tries_left = 3;
679   for (my $attempts = 0; $attempts < 3; $attempts++) {
680     my @srunargs = ("srun",
681                     "--nodelist=$nodelist",
682                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
683     my @execargs = ("sh", "-c",
684                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
685
686     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
687     my ($stdout, $stderr, $tempfail);
688     ($exited, $stdout, $stderr, $tempfail) = srun_sync(
689       \@srunargs, \@execargs,
690       {label => "run install script on all workers"},
691         $build_script . $git_archive);
692     if ($tempfail) {
693       exit_retry_unlocked();
694     }
695
696     my $stderr_anything_from_script = 0;
697     for my $line (split(/\n/, $stderr)) {
698       if ($line !~ /^(srun: error: |starting: \[)/) {
699         $stderr_anything_from_script = 1;
700       }
701     }
702
703     last if $exited == 0 || $main::please_freeze;
704
705     # If the install script fails but doesn't print an error message,
706     # the next thing anyone is likely to do is just run it again in
707     # case it was a transient problem like "slurm communication fails
708     # because the network isn't reliable enough". So we'll just do
709     # that ourselves (up to 3 attempts in total). OTOH, if there is an
710     # error message, the problem is more likely to have a real fix and
711     # we should fail the job so the fixing process can start, instead
712     # of doing 2 more attempts.
713     last if $stderr_anything_from_script;
714   }
715
716   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
717     unlink($tar_filename);
718   }
719
720   if ($exited != 0) {
721     croak("Giving up");
722   }
723 }
724
725 foreach (qw (script script_version script_parameters runtime_constraints))
726 {
727   Log (undef,
728        "$_ " .
729        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
730 }
731 foreach (split (/\n/, $Job->{knobs}))
732 {
733   Log (undef, "knob " . $_);
734 }
735 my $resp = api_call(
736   'nodes/list',
737   'filters' => [['hostname', 'in', \@node]],
738   'order' => 'hostname',
739   'limit' => scalar(@node),
740     );
741 for my $n (@{$resp->{items}}) {
742   Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
743 }
744
745
746
747 $main::success = undef;
748
749
750
751 ONELEVEL:
752
753 my $thisround_succeeded = 0;
754 my $thisround_failed = 0;
755 my $thisround_failed_multiple = 0;
756 my $working_slot_count = scalar(@slot);
757
758 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
759                        or $a <=> $b } @jobstep_todo;
760 my $level = $jobstep[$jobstep_todo[0]]->{level};
761
762 my $initial_tasks_this_level = 0;
763 foreach my $id (@jobstep_todo) {
764   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
765 }
766
767 # If the number of tasks scheduled at this level #T is smaller than the number
768 # of slots available #S, only use the first #T slots, or the first slot on
769 # each node, whichever number is greater.
770 #
771 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
772 # based on these numbers.  Using fewer slots makes more resources available
773 # to each individual task, which should normally be a better strategy when
774 # there are fewer of them running with less parallelism.
775 #
776 # Note that this calculation is not redone if the initial tasks at
777 # this level queue more tasks at the same level.  This may harm
778 # overall task throughput for that level.
779 my @freeslot;
780 if ($initial_tasks_this_level < @node) {
781   @freeslot = (0..$#node);
782 } elsif ($initial_tasks_this_level < @slot) {
783   @freeslot = (0..$initial_tasks_this_level - 1);
784 } else {
785   @freeslot = (0..$#slot);
786 }
787 my $round_num_freeslots = scalar(@freeslot);
788 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
789
790 my %round_max_slots = ();
791 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
792   my $this_slot = $slot[$freeslot[$ii]];
793   my $node_name = $this_slot->{node}->{name};
794   $round_max_slots{$node_name} ||= $this_slot->{cpu};
795   last if (scalar(keys(%round_max_slots)) >= @node);
796 }
797
798 Log(undef, "start level $level with $round_num_freeslots slots");
799 my @holdslot;
800 my %reader;
801 my $progress_is_dirty = 1;
802 my $progress_stats_updated = 0;
803
804 update_progress_stats();
805
806
807 THISROUND:
808 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
809 {
810   # Don't create new tasks if we already know the job's final result.
811   last if defined($main::success);
812
813   my $id = $jobstep_todo[$todo_ptr];
814   my $Jobstep = $jobstep[$id];
815   if ($Jobstep->{level} != $level)
816   {
817     next;
818   }
819
820   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
821   set_nonblocking($reader{$id});
822
823   my $childslot = $freeslot[0];
824   my $childnode = $slot[$childslot]->{node};
825   my $childslotname = join (".",
826                             $slot[$childslot]->{node}->{name},
827                             $slot[$childslot]->{cpu});
828
829   my $childpid = fork();
830   if ($childpid == 0)
831   {
832     $SIG{'INT'} = 'DEFAULT';
833     $SIG{'QUIT'} = 'DEFAULT';
834     $SIG{'TERM'} = 'DEFAULT';
835
836     foreach (values (%reader))
837     {
838       close($_);
839     }
840     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
841     open(STDOUT,">&writer") or croak ($!);
842     open(STDERR,">&writer") or croak ($!);
843
844     undef $dbh;
845     undef $sth;
846
847     delete $ENV{"GNUPGHOME"};
848     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
849     $ENV{"TASK_QSEQUENCE"} = $id;
850     $ENV{"TASK_SEQUENCE"} = $level;
851     $ENV{"JOB_SCRIPT"} = $Job->{script};
852     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
853       $param =~ tr/a-z/A-Z/;
854       $ENV{"JOB_PARAMETER_$param"} = $value;
855     }
856     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
857     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
858     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
859     $ENV{"HOME"} = $ENV{"TASK_WORK"};
860     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
861     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
862     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
863
864     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
865
866     $ENV{"GZIP"} = "-n";
867
868     my @srunargs = (
869       "srun",
870       "--nodelist=".$childnode->{name},
871       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
872       "--job-name=$job_id.$id.$$",
873         );
874
875     my $stdbuf = " stdbuf --output=0 --error=0 ";
876
877     my $arv_file_cache = "";
878     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
879       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
880     }
881
882     my $command =
883         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
884         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
885         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
886         # These environment variables get used explicitly later in
887         # $command.  No tool is expected to read these values directly.
888         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
889         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
890         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
891         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
892         .q{&& declare -a VOLUMES=() }
893         .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
894         .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
895         .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
896
897     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
898     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
899     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
900
901     if ($docker_hash)
902     {
903       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
904       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
905       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
906       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
907       # We only set memory limits if Docker lets us limit both memory and swap.
908       # Memory limits alone have been supported longer, but subprocesses tend
909       # to get SIGKILL if they exceed that without any swap limit set.
910       # See #5642 for additional background.
911       if ($docker_limitmem) {
912         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
913       }
914
915       # The source tree and $destdir directory (which we have
916       # installed on the worker host) are available in the container,
917       # under the same path.
918       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
919       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
920
921       # Currently, we make the "by_pdh" directory in arv-mount's mount
922       # point appear at /keep inside the container (instead of using
923       # the same path as the host like we do with CRUNCH_SRC and
924       # CRUNCH_INSTALL). However, crunch scripts and utilities must
925       # not rely on this. They must use $TASK_KEEPMOUNT.
926       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
927       $ENV{TASK_KEEPMOUNT} = "/keep";
928
929       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
930       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
931       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
932
933       # TASK_WORK is almost exactly like a docker data volume: it
934       # starts out empty, is writable, and persists until no
935       # containers use it any more. We don't use --volumes-from to
936       # share it with other containers: it is only accessible to this
937       # task, and it goes away when this task stops.
938       #
939       # However, a docker data volume is writable only by root unless
940       # the mount point already happens to exist in the container with
941       # different permissions. Therefore, we [1] assume /tmp already
942       # exists in the image and is writable by the crunch user; [2]
943       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
944       # writable if they are created by docker while setting up the
945       # other --volumes); and [3] create $TASK_WORK inside the
946       # container using $build_script.
947       $command .= "--volume=/tmp ";
948       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
949       $ENV{"HOME"} = $ENV{"TASK_WORK"};
950       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
951
952       # TODO: Share a single JOB_WORK volume across all task
953       # containers on a given worker node, and delete it when the job
954       # ends (and, in case that doesn't work, when the next job
955       # starts).
956       #
957       # For now, use the same approach as TASK_WORK above.
958       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
959
960       # Bind mount the crunchrunner binary and host TLS certificates file into
961       # the container.
962       $command .= '"${VOLUMES[@]}" ';
963
964       while (my ($env_key, $env_val) = each %ENV)
965       {
966         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
967           $command .= "--env=\Q$env_key=$env_val\E ";
968         }
969       }
970       $command .= "--env=\QHOME=$ENV{HOME}\E ";
971       $command .= "\Q$docker_hash\E ";
972
973       if ($Job->{arvados_sdk_version}) {
974         $command .= $stdbuf;
975         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
976       } else {
977         $command .= "/bin/sh -c \'python -c " .
978             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
979             ">&2 2>/dev/null; " .
980             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
981             "if which stdbuf >/dev/null ; then " .
982             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
983             " else " .
984             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
985             " fi\'";
986       }
987     } else {
988       # Non-docker run
989       $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
990       $command .= $stdbuf;
991       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
992     }
993
994     my @execargs = ('bash', '-c', $command);
995     srun (\@srunargs, \@execargs, undef, $build_script);
996     # exec() failed, we assume nothing happened.
997     die "srun() failed on build script\n";
998   }
999   close("writer");
1000   if (!defined $childpid)
1001   {
1002     close $reader{$id};
1003     delete $reader{$id};
1004     next;
1005   }
1006   shift @freeslot;
1007   $proc{$childpid} = {
1008     jobstepidx => $id,
1009     time => time,
1010     slot => $childslot,
1011     jobstepname => "$job_id.$id.$childpid",
1012   };
1013   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1014   $slot[$childslot]->{pid} = $childpid;
1015
1016   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1017   Log ($id, "child $childpid started on $childslotname");
1018   $Jobstep->{starttime} = time;
1019   $Jobstep->{node} = $childnode->{name};
1020   $Jobstep->{slotindex} = $childslot;
1021   delete $Jobstep->{stderr};
1022   delete $Jobstep->{finishtime};
1023   delete $Jobstep->{tempfail};
1024
1025   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1026   retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1027
1028   splice @jobstep_todo, $todo_ptr, 1;
1029   --$todo_ptr;
1030
1031   $progress_is_dirty = 1;
1032
1033   while (!@freeslot
1034          ||
1035          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1036   {
1037     last THISROUND if $main::please_freeze;
1038     if ($main::please_info)
1039     {
1040       $main::please_info = 0;
1041       freeze();
1042       create_output_collection();
1043       save_meta(1);
1044       update_progress_stats();
1045     }
1046     my $gotsome
1047         = readfrompipes ()
1048         + reapchildren ();
1049     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1050     {
1051       check_refresh_wanted();
1052       check_squeue();
1053       update_progress_stats();
1054     }
1055     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1056     {
1057       update_progress_stats();
1058     }
1059     if (!$gotsome) {
1060       select (undef, undef, undef, 0.1);
1061     }
1062     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1063                                         $_->{node}->{hold_count} < 4 } @slot);
1064     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1065         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1066     {
1067       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1068           .($thisround_failed+$thisround_succeeded)
1069           .") -- giving up on this round";
1070       Log (undef, $message);
1071       last THISROUND;
1072     }
1073
1074     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1075     for (my $i=$#freeslot; $i>=0; $i--) {
1076       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1077         push @holdslot, (splice @freeslot, $i, 1);
1078       }
1079     }
1080     for (my $i=$#holdslot; $i>=0; $i--) {
1081       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1082         push @freeslot, (splice @holdslot, $i, 1);
1083       }
1084     }
1085
1086     # give up if no nodes are succeeding
1087     if ($working_slot_count < 1) {
1088       Log(undef, "Every node has failed -- giving up");
1089       last THISROUND;
1090     }
1091   }
1092 }
1093
1094
1095 push @freeslot, splice @holdslot;
1096 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1097
1098
1099 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1100 while (%proc)
1101 {
1102   if ($main::please_continue) {
1103     $main::please_continue = 0;
1104     goto THISROUND;
1105   }
1106   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1107   readfrompipes ();
1108   if (!reapchildren())
1109   {
1110     check_refresh_wanted();
1111     check_squeue();
1112     update_progress_stats();
1113     select (undef, undef, undef, 0.1);
1114     killem (keys %proc) if $main::please_freeze;
1115   }
1116 }
1117
1118 update_progress_stats();
1119 freeze_if_want_freeze();
1120
1121
1122 if (!defined $main::success)
1123 {
1124   if (!@jobstep_todo) {
1125     $main::success = 1;
1126   } elsif ($working_slot_count < 1) {
1127     save_output_collection();
1128     save_meta();
1129     exit_retry_unlocked();
1130   } elsif ($thisround_succeeded == 0 &&
1131            ($thisround_failed == 0 || $thisround_failed > 4)) {
1132     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1133     Log (undef, $message);
1134     $main::success = 0;
1135   }
1136 }
1137
1138 goto ONELEVEL if !defined $main::success;
1139
1140
1141 release_allocation();
1142 freeze();
1143 my $collated_output = save_output_collection();
1144 Log (undef, "finish");
1145
1146 my $final_log = save_meta();
1147
1148 my $final_state;
1149 if ($collated_output && $final_log && $main::success) {
1150   $final_state = 'Complete';
1151 } else {
1152   $final_state = 'Failed';
1153 }
1154 $Job->update_attributes('state' => $final_state);
1155
1156 exit (($final_state eq 'Complete') ? 0 : 1);
1157
1158
1159
1160 sub update_progress_stats
1161 {
1162   $progress_stats_updated = time;
1163   return if !$progress_is_dirty;
1164   my ($todo, $done, $running) = (scalar @jobstep_todo,
1165                                  scalar @jobstep_done,
1166                                  scalar keys(%proc));
1167   $Job->{'tasks_summary'} ||= {};
1168   $Job->{'tasks_summary'}->{'todo'} = $todo;
1169   $Job->{'tasks_summary'}->{'done'} = $done;
1170   $Job->{'tasks_summary'}->{'running'} = $running;
1171   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1172   Log (undef, "status: $done done, $running running, $todo todo");
1173   $progress_is_dirty = 0;
1174 }
1175
1176
1177
1178 sub reapchildren
1179 {
1180   my $children_reaped = 0;
1181   my @successful_task_uuids = ();
1182
1183   while((my $pid = waitpid (-1, WNOHANG)) > 0)
1184   {
1185     my $childstatus = $?;
1186
1187     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1188                     . "."
1189                     . $slot[$proc{$pid}->{slot}]->{cpu});
1190     my $jobstepidx = $proc{$pid}->{jobstepidx};
1191
1192     readfrompipes_after_exit ($jobstepidx);
1193
1194     $children_reaped++;
1195     my $elapsed = time - $proc{$pid}->{time};
1196     my $Jobstep = $jobstep[$jobstepidx];
1197
1198     my $exitvalue = $childstatus >> 8;
1199     my $exitinfo = "exit ".exit_status_s($childstatus);
1200     $Jobstep->{'arvados_task'}->reload;
1201     my $task_success = $Jobstep->{'arvados_task'}->{success};
1202
1203     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1204
1205     if (!defined $task_success) {
1206       # task did not indicate one way or the other --> fail
1207       Log($jobstepidx, sprintf(
1208             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1209             exit_status_s($childstatus)));
1210       $Jobstep->{'arvados_task'}->{success} = 0;
1211       retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1212       $task_success = 0;
1213     }
1214
1215     if (!$task_success)
1216     {
1217       my $temporary_fail;
1218       $temporary_fail ||= $Jobstep->{tempfail};
1219       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1220
1221       ++$thisround_failed;
1222       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1223
1224       # Check for signs of a failed or misconfigured node
1225       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1226           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1227         # Don't count this against jobstep failure thresholds if this
1228         # node is already suspected faulty and srun exited quickly
1229         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1230             $elapsed < 5) {
1231           Log ($jobstepidx, "blaming failure on suspect node " .
1232                $slot[$proc{$pid}->{slot}]->{node}->{name});
1233           $temporary_fail ||= 1;
1234         }
1235         ban_node_by_slot($proc{$pid}->{slot});
1236       }
1237
1238       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1239                                 ++$Jobstep->{'failures'},
1240                                 $temporary_fail ? 'temporary' : 'permanent',
1241                                 $elapsed));
1242
1243       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1244         # Give up on this task, and the whole job
1245         $main::success = 0;
1246       }
1247       # Put this task back on the todo queue
1248       push @jobstep_todo, $jobstepidx;
1249       $Job->{'tasks_summary'}->{'failed'}++;
1250     }
1251     else # task_success
1252     {
1253       push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1254       ++$thisround_succeeded;
1255       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1256       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1257       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1258       push @jobstep_done, $jobstepidx;
1259       Log ($jobstepidx, "success in $elapsed seconds");
1260     }
1261     $Jobstep->{exitcode} = $childstatus;
1262     $Jobstep->{finishtime} = time;
1263     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1264     retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
1265     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1266                               length($Jobstep->{'arvados_task'}->{output}),
1267                               $Jobstep->{'arvados_task'}->{output}));
1268
1269     close $reader{$jobstepidx};
1270     delete $reader{$jobstepidx};
1271     delete $slot[$proc{$pid}->{slot}]->{pid};
1272     push @freeslot, $proc{$pid}->{slot};
1273     delete $proc{$pid};
1274
1275     $progress_is_dirty = 1;
1276   }
1277
1278   if (scalar(@successful_task_uuids) > 0)
1279   {
1280     Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1281     # Load new tasks
1282     my $newtask_list = [];
1283     my $newtask_results;
1284     do {
1285       $newtask_results = api_call(
1286         "job_tasks/list",
1287         'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1288         'order' => 'qsequence',
1289         'offset' => scalar(@$newtask_list),
1290           );
1291       push(@$newtask_list, @{$newtask_results->{items}});
1292     } while (@{$newtask_results->{items}});
1293     Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1294     foreach my $arvados_task (@$newtask_list) {
1295       my $jobstep = {
1296         'level' => $arvados_task->{'sequence'},
1297         'failures' => 0,
1298         'arvados_task' => $arvados_task
1299       };
1300       push @jobstep, $jobstep;
1301       push @jobstep_todo, $#jobstep;
1302     }
1303   }
1304
1305   return $children_reaped;
1306 }
1307
1308 sub check_refresh_wanted
1309 {
1310   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1311   if (@stat &&
1312       $stat[9] > $latest_refresh &&
1313       # ...and we have actually locked the job record...
1314       $job_id eq $Job->{'uuid'}) {
1315     $latest_refresh = scalar time;
1316     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1317     for my $attr ('cancelled_at',
1318                   'cancelled_by_user_uuid',
1319                   'cancelled_by_client_uuid',
1320                   'state') {
1321       $Job->{$attr} = $Job2->{$attr};
1322     }
1323     if ($Job->{'state'} ne "Running") {
1324       if ($Job->{'state'} eq "Cancelled") {
1325         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1326       } else {
1327         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1328       }
1329       $main::success = 0;
1330       $main::please_freeze = 1;
1331     }
1332   }
1333 }
1334
1335 sub check_squeue
1336 {
1337   my $last_squeue_check = $squeue_checked;
1338
1339   # Do not call `squeue` or check the kill list more than once every
1340   # 15 seconds.
1341   return if $last_squeue_check > time - 15;
1342   $squeue_checked = time;
1343
1344   # Look for children from which we haven't received stderr data since
1345   # the last squeue check. If no such children exist, all procs are
1346   # alive and there's no need to even look at squeue.
1347   #
1348   # As long as the crunchstat poll interval (10s) is shorter than the
1349   # squeue check interval (15s) this should make the squeue check an
1350   # infrequent event.
1351   my $silent_procs = 0;
1352   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1353   {
1354     if (!exists($js->{stderr_at}))
1355     {
1356       $js->{stderr_at} = 0;
1357     }
1358     if ($js->{stderr_at} < $last_squeue_check)
1359     {
1360       $silent_procs++;
1361     }
1362   }
1363   return if $silent_procs == 0;
1364
1365   # use killem() on procs whose killtime is reached
1366   while (my ($pid, $procinfo) = each %proc)
1367   {
1368     my $js = $jobstep[$procinfo->{jobstepidx}];
1369     if (exists $procinfo->{killtime}
1370         && $procinfo->{killtime} <= time
1371         && $js->{stderr_at} < $last_squeue_check)
1372     {
1373       my $sincewhen = "";
1374       if ($js->{stderr_at}) {
1375         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1376       }
1377       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1378       killem ($pid);
1379     }
1380   }
1381
1382   if (!$have_slurm)
1383   {
1384     # here is an opportunity to check for mysterious problems with local procs
1385     return;
1386   }
1387
1388   # Get a list of steps still running.  Note: squeue(1) says --steps
1389   # selects a format (which we override anyway) and allows us to
1390   # specify which steps we're interested in (which we don't).
1391   # Importantly, it also changes the meaning of %j from "job name" to
1392   # "step name" and (although this isn't mentioned explicitly in the
1393   # docs) switches from "one line per job" mode to "one line per step"
1394   # mode. Without it, we'd just get a list of one job, instead of a
1395   # list of N steps.
1396   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1397   if ($? != 0)
1398   {
1399     Log(undef, "warning: squeue exit status $? ($!)");
1400     return;
1401   }
1402   chop @squeue;
1403
1404   # which of my jobsteps are running, according to squeue?
1405   my %ok;
1406   for my $jobstepname (@squeue)
1407   {
1408     $ok{$jobstepname} = 1;
1409   }
1410
1411   # Check for child procs >60s old and not mentioned by squeue.
1412   while (my ($pid, $procinfo) = each %proc)
1413   {
1414     if ($procinfo->{time} < time - 60
1415         && $procinfo->{jobstepname}
1416         && !exists $ok{$procinfo->{jobstepname}}
1417         && !exists $procinfo->{killtime})
1418     {
1419       # According to slurm, this task has ended (successfully or not)
1420       # -- but our srun child hasn't exited. First we must wait (30
1421       # seconds) in case this is just a race between communication
1422       # channels. Then, if our srun child process still hasn't
1423       # terminated, we'll conclude some slurm communication
1424       # error/delay has caused the task to die without notifying srun,
1425       # and we'll kill srun ourselves.
1426       $procinfo->{killtime} = time + 30;
1427       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1428     }
1429   }
1430 }
1431
1432 sub check_sinfo
1433 {
1434   # If a node fails in a multi-node "srun" call during job setup, the call
1435   # may hang instead of exiting with a nonzero code.  This function checks
1436   # "sinfo" for the health of the nodes that were allocated and ensures that
1437   # they are all still in the "alloc" state.  If a node that is allocated to
1438   # this job is not in "alloc" state, then set please_freeze.
1439   #
1440   # This is only called from srun_sync() for node configuration.  If a
1441   # node fails doing actual work, there are other recovery mechanisms.
1442
1443   # Do not call `sinfo` more than once every 15 seconds.
1444   return if $sinfo_checked > time - 15;
1445   $sinfo_checked = time;
1446
1447   # The output format "%t" means output node states.
1448   my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1449   if ($? != 0)
1450   {
1451     Log(undef, "warning: sinfo exit status $? ($!)");
1452     return;
1453   }
1454   chop @sinfo;
1455
1456   foreach (@sinfo)
1457   {
1458     if ($_ != "alloc" && $_ != "alloc*") {
1459       $main::please_freeze = 1;
1460     }
1461   }
1462 }
1463
1464 sub release_allocation
1465 {
1466   if ($have_slurm)
1467   {
1468     Log (undef, "release job allocation");
1469     system "scancel $ENV{SLURM_JOB_ID}";
1470   }
1471 }
1472
1473
1474 sub readfrompipes
1475 {
1476   my $gotsome = 0;
1477   my %fd_job;
1478   my $sel = IO::Select->new();
1479   foreach my $jobstepidx (keys %reader)
1480   {
1481     my $fd = $reader{$jobstepidx};
1482     $sel->add($fd);
1483     $fd_job{$fd} = $jobstepidx;
1484
1485     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1486       $sel->add($stdout_fd);
1487       $fd_job{$stdout_fd} = $jobstepidx;
1488     }
1489   }
1490   # select on all reader fds with 0.1s timeout
1491   my @ready_fds = $sel->can_read(0.1);
1492   foreach my $fd (@ready_fds)
1493   {
1494     my $buf;
1495     if (0 < sysread ($fd, $buf, 65536))
1496     {
1497       $gotsome = 1;
1498       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1499
1500       my $jobstepidx = $fd_job{$fd};
1501       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1502         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1503         next;
1504       }
1505
1506       $jobstep[$jobstepidx]->{stderr_at} = time;
1507       $jobstep[$jobstepidx]->{stderr} .= $buf;
1508
1509       # Consume everything up to the last \n
1510       preprocess_stderr ($jobstepidx);
1511
1512       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1513       {
1514         # If we get a lot of stderr without a newline, chop off the
1515         # front to avoid letting our buffer grow indefinitely.
1516         substr ($jobstep[$jobstepidx]->{stderr},
1517                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1518       }
1519     }
1520   }
1521   return $gotsome;
1522 }
1523
1524
1525 # Consume all full lines of stderr for a jobstep. Everything after the
1526 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1527 # returning.
1528 sub preprocess_stderr
1529 {
1530   my $jobstepidx = shift;
1531   # slotindex is only defined for children running Arvados job tasks.
1532   # Be prepared to handle the undef case (for setup srun calls, etc.).
1533   my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1534
1535   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1536     my $line = $1;
1537     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1538     Log ($jobstepidx, "stderr $line");
1539     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1540       # If the allocation is revoked, we can't possibly continue, so mark all
1541       # nodes as failed.  This will cause the overall exit code to be
1542       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1543       # this job.
1544       $main::please_freeze = 1;
1545       foreach my $st (@slot) {
1546         $st->{node}->{fail_count}++;
1547       }
1548     }
1549     elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
1550       $jobstep[$jobstepidx]->{tempfail} = 1;
1551       if (defined($job_slot_index)) {
1552         $slot[$job_slot_index]->{node}->{fail_count}++;
1553         ban_node_by_slot($job_slot_index);
1554       }
1555     }
1556     elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1557       $jobstep[$jobstepidx]->{tempfail} = 1;
1558       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1559     }
1560     elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1561       $jobstep[$jobstepidx]->{tempfail} = 1;
1562     }
1563   }
1564 }
1565
1566
1567 # Read whatever is still available on its stderr+stdout pipes after
1568 # the given child process has exited.
1569 sub readfrompipes_after_exit
1570 {
1571   my $jobstepidx = shift;
1572
1573   # The fact that the child has exited allows some convenient
1574   # simplifications: (1) all data must have already been written, so
1575   # there's no need to wait for more once sysread returns 0; (2) the
1576   # total amount of data available is bounded by the pipe buffer size,
1577   # so it's safe to read everything into one string.
1578   my $buf;
1579   while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
1580     $jobstep[$jobstepidx]->{stderr_at} = time;
1581     $jobstep[$jobstepidx]->{stderr} .= $buf;
1582   }
1583   if ($jobstep[$jobstepidx]->{stdout_r}) {
1584     while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
1585       $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1586     }
1587   }
1588   preprocess_stderr ($jobstepidx);
1589
1590   map {
1591     Log ($jobstepidx, "stderr $_");
1592   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1593   $jobstep[$jobstepidx]->{stderr} = '';
1594 }
1595
1596 sub fetch_block
1597 {
1598   my $hash = shift;
1599   my $keep;
1600   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1601     Log(undef, "fetch_block run error from arv-get $hash: $!");
1602     return undef;
1603   }
1604   my $output_block = "";
1605   while (1) {
1606     my $buf;
1607     my $bytes = sysread($keep, $buf, 1024 * 1024);
1608     if (!defined $bytes) {
1609       Log(undef, "fetch_block read error from arv-get: $!");
1610       $output_block = undef;
1611       last;
1612     } elsif ($bytes == 0) {
1613       # sysread returns 0 at the end of the pipe.
1614       last;
1615     } else {
1616       # some bytes were read into buf.
1617       $output_block .= $buf;
1618     }
1619   }
1620   close $keep;
1621   if ($?) {
1622     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1623     $output_block = undef;
1624   }
1625   return $output_block;
1626 }
1627
1628 # Create a collection by concatenating the output of all tasks (each
1629 # task's output is either a manifest fragment, a locator for a
1630 # manifest fragment stored in Keep, or nothing at all). Return the
1631 # portable_data_hash of the new collection.
1632 sub create_output_collection
1633 {
1634   Log (undef, "collate");
1635
1636   my ($child_out, $child_in);
1637   # This depends on the python-arvados-python-client package, which needs to be installed
1638   # on the machine running crunch-dispatch (typically, the API server).
1639   my $pid = open2($child_out, $child_in, '/usr/share/python2.7/dist/python-arvados-python-client/bin/python', '-c', q{
1640 import arvados
1641 import sys
1642 print (arvados.api("v1").collections().
1643        create(body={"manifest_text": sys.stdin.read(),
1644                     "owner_uuid": sys.argv[2]}).
1645        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1646 }, retry_count(), $Job->{owner_uuid});
1647
1648   my $task_idx = -1;
1649   my $manifest_size = 0;
1650   for (@jobstep)
1651   {
1652     ++$task_idx;
1653     my $output = $_->{'arvados_task'}->{output};
1654     next if (!defined($output));
1655     my $next_write;
1656     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1657       $next_write = fetch_block($output);
1658     } else {
1659       $next_write = $output;
1660     }
1661     if (defined($next_write)) {
1662       if (!defined(syswrite($child_in, $next_write))) {
1663         # There's been an error writing.  Stop the loop.
1664         # We'll log details about the exit code later.
1665         last;
1666       } else {
1667         $manifest_size += length($next_write);
1668       }
1669     } else {
1670       my $uuid = $_->{'arvados_task'}->{'uuid'};
1671       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1672       $main::success = 0;
1673     }
1674   }
1675   close($child_in);
1676   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1677
1678   my $joboutput;
1679   my $s = IO::Select->new($child_out);
1680   if ($s->can_read(120)) {
1681     sysread($child_out, $joboutput, 1024 * 1024);
1682     waitpid($pid, 0);
1683     if ($?) {
1684       Log(undef, "output collection creation exited " . exit_status_s($?));
1685       $joboutput = undef;
1686     } else {
1687       chomp($joboutput);
1688     }
1689   } else {
1690     Log (undef, "timed out while creating output collection");
1691     foreach my $signal (2, 2, 2, 15, 15, 9) {
1692       kill($signal, $pid);
1693       last if waitpid($pid, WNOHANG) == -1;
1694       sleep(1);
1695     }
1696   }
1697   close($child_out);
1698
1699   return $joboutput;
1700 }
1701
1702 # Calls create_output_collection, logs the result, and returns it.
1703 # If that was successful, save that as the output in the job record.
1704 sub save_output_collection {
1705   my $collated_output = create_output_collection();
1706
1707   if (!$collated_output) {
1708     Log(undef, "Failed to write output collection");
1709   }
1710   else {
1711     Log(undef, "job output $collated_output");
1712     $Job->update_attributes('output' => $collated_output);
1713   }
1714   return $collated_output;
1715 }
1716
1717 sub killem
1718 {
1719   foreach (@_)
1720   {
1721     my $sig = 2;                # SIGINT first
1722     if (exists $proc{$_}->{"sent_$sig"} &&
1723         time - $proc{$_}->{"sent_$sig"} > 4)
1724     {
1725       $sig = 15;                # SIGTERM if SIGINT doesn't work
1726     }
1727     if (exists $proc{$_}->{"sent_$sig"} &&
1728         time - $proc{$_}->{"sent_$sig"} > 4)
1729     {
1730       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1731     }
1732     if (!exists $proc{$_}->{"sent_$sig"})
1733     {
1734       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1735       kill $sig, $_;
1736       select (undef, undef, undef, 0.1);
1737       if ($sig == 2)
1738       {
1739         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1740       }
1741       $proc{$_}->{"sent_$sig"} = time;
1742       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1743     }
1744   }
1745 }
1746
1747
1748 sub fhbits
1749 {
1750   my($bits);
1751   for (@_) {
1752     vec($bits,fileno($_),1) = 1;
1753   }
1754   $bits;
1755 }
1756
1757
1758 # Send log output to Keep via arv-put.
1759 #
1760 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1761 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1762 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1763 # $log_pipe_pid is the pid of the arv-put subprocess.
1764 #
1765 # The only functions that should access these variables directly are:
1766 #
1767 # log_writer_start($logfilename)
1768 #     Starts an arv-put pipe, reading data on stdin and writing it to
1769 #     a $logfilename file in an output collection.
1770 #
1771 # log_writer_read_output([$timeout])
1772 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1773 #     Passes $timeout to the select() call, with a default of 0.01.
1774 #     Returns the result of the last read() call on $log_pipe_out, or
1775 #     -1 if read() wasn't called because select() timed out.
1776 #     Only other log_writer_* functions should need to call this.
1777 #
1778 # log_writer_send($txt)
1779 #     Writes $txt to the output log collection.
1780 #
1781 # log_writer_finish()
1782 #     Closes the arv-put pipe and returns the output that it produces.
1783 #
1784 # log_writer_is_active()
1785 #     Returns a true value if there is currently a live arv-put
1786 #     process, false otherwise.
1787 #
1788 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1789     $log_pipe_pid);
1790
1791 sub log_writer_start($)
1792 {
1793   my $logfilename = shift;
1794   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1795                         'arv-put',
1796                         '--stream',
1797                         '--retries', '6',
1798                         '--filename', $logfilename,
1799                         '-');
1800   $log_pipe_out_buf = "";
1801   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1802 }
1803
1804 sub log_writer_read_output {
1805   my $timeout = shift || 0.01;
1806   my $read = -1;
1807   while ($read && $log_pipe_out_select->can_read($timeout)) {
1808     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1809                  length($log_pipe_out_buf));
1810   }
1811   if (!defined($read)) {
1812     Log(undef, "error reading log manifest from arv-put: $!");
1813   }
1814   return $read;
1815 }
1816
1817 sub log_writer_send($)
1818 {
1819   my $txt = shift;
1820   print $log_pipe_in $txt;
1821   log_writer_read_output();
1822 }
1823
1824 sub log_writer_finish()
1825 {
1826   return unless $log_pipe_pid;
1827
1828   close($log_pipe_in);
1829
1830   my $logger_failed = 0;
1831   my $read_result = log_writer_read_output(600);
1832   if ($read_result == -1) {
1833     $logger_failed = -1;
1834     Log (undef, "timed out reading from 'arv-put'");
1835   } elsif ($read_result != 0) {
1836     $logger_failed = -2;
1837     Log(undef, "failed to read arv-put log manifest to EOF");
1838   }
1839
1840   waitpid($log_pipe_pid, 0);
1841   if ($?) {
1842     $logger_failed ||= $?;
1843     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1844   }
1845
1846   close($log_pipe_out);
1847   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1848   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1849       $log_pipe_out_select = undef;
1850
1851   return $arv_put_output;
1852 }
1853
1854 sub log_writer_is_active() {
1855   return $log_pipe_pid;
1856 }
1857
1858 sub Log                         # ($jobstepidx, $logmessage)
1859 {
1860   my ($jobstepidx, $logmessage) = @_;
1861   if ($logmessage =~ /\n/) {
1862     for my $line (split (/\n/, $_[1])) {
1863       Log ($jobstepidx, $line);
1864     }
1865     return;
1866   }
1867   my $fh = select STDERR; $|=1; select $fh;
1868   my $task_qseq = '';
1869   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1870     $task_qseq = $jobstepidx;
1871   }
1872   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1873   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1874   $message .= "\n";
1875   my $datetime;
1876   if (log_writer_is_active() || -t STDERR) {
1877     my @gmtime = gmtime;
1878     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1879                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1880   }
1881   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1882
1883   if (log_writer_is_active()) {
1884     log_writer_send($datetime . " " . $message);
1885   }
1886 }
1887
1888
1889 sub croak
1890 {
1891   my ($package, $file, $line) = caller;
1892   my $message = "@_ at $file line $line\n";
1893   Log (undef, $message);
1894   release_allocation();
1895   freeze() if @jobstep_todo;
1896   create_output_collection() if @jobstep_todo;
1897   cleanup();
1898   save_meta();
1899   die;
1900 }
1901
1902
1903 sub cleanup
1904 {
1905   return unless $Job;
1906   if ($Job->{'state'} eq 'Cancelled') {
1907     $Job->update_attributes('finished_at' => scalar gmtime);
1908   } else {
1909     $Job->update_attributes('state' => 'Failed');
1910   }
1911 }
1912
1913
1914 sub save_meta
1915 {
1916   my $justcheckpoint = shift; # false if this will be the last meta saved
1917   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1918   return unless log_writer_is_active();
1919   my $log_manifest = log_writer_finish();
1920   return unless defined($log_manifest);
1921
1922   if ($Job->{log}) {
1923     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1924     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1925   }
1926
1927   my $log_coll = api_call(
1928     "collections/create", ensure_unique_name => 1, collection => {
1929       manifest_text => $log_manifest,
1930       owner_uuid => $Job->{owner_uuid},
1931       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1932     });
1933   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1934   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1935
1936   return $log_coll->{portable_data_hash};
1937 }
1938
1939
1940 sub freeze_if_want_freeze
1941 {
1942   if ($main::please_freeze)
1943   {
1944     release_allocation();
1945     if (@_)
1946     {
1947       # kill some srun procs before freeze+stop
1948       map { $proc{$_} = {} } @_;
1949       while (%proc)
1950       {
1951         killem (keys %proc);
1952         select (undef, undef, undef, 0.1);
1953         my $died;
1954         while (($died = waitpid (-1, WNOHANG)) > 0)
1955         {
1956           delete $proc{$died};
1957         }
1958       }
1959     }
1960     freeze();
1961     create_output_collection();
1962     cleanup();
1963     save_meta();
1964     exit 1;
1965   }
1966 }
1967
1968
1969 sub freeze
1970 {
1971   Log (undef, "Freeze not implemented");
1972   return;
1973 }
1974
1975
1976 sub thaw
1977 {
1978   croak ("Thaw not implemented");
1979 }
1980
1981
1982 sub freezequote
1983 {
1984   my $s = shift;
1985   $s =~ s/\\/\\\\/g;
1986   $s =~ s/\n/\\n/g;
1987   return $s;
1988 }
1989
1990
1991 sub freezeunquote
1992 {
1993   my $s = shift;
1994   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1995   return $s;
1996 }
1997
1998 sub srun_sync
1999 {
2000   my $srunargs = shift;
2001   my $execargs = shift;
2002   my $opts = shift || {};
2003   my $stdin = shift;
2004
2005   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
2006   Log (undef, "$label: start");
2007
2008   my ($stderr_r, $stderr_w);
2009   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
2010
2011   my ($stdout_r, $stdout_w);
2012   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
2013
2014   my $started_srun = scalar time;
2015
2016   my $srunpid = fork();
2017   if ($srunpid == 0)
2018   {
2019     close($stderr_r);
2020     close($stdout_r);
2021     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
2022     fcntl($stdout_w, F_SETFL, 0) or croak($!);
2023     open(STDERR, ">&", $stderr_w) or croak ($!);
2024     open(STDOUT, ">&", $stdout_w) or croak ($!);
2025     srun ($srunargs, $execargs, $opts, $stdin);
2026     exit (1);
2027   }
2028   close($stderr_w);
2029   close($stdout_w);
2030
2031   set_nonblocking($stderr_r);
2032   set_nonblocking($stdout_r);
2033
2034   # Add entries to @jobstep and %proc so check_squeue() and
2035   # freeze_if_want_freeze() can treat it like a job task process.
2036   push @jobstep, {
2037     stderr => '',
2038     stderr_at => 0,
2039     stderr_captured => '',
2040     stdout_r => $stdout_r,
2041     stdout_captured => '',
2042   };
2043   my $jobstepidx = $#jobstep;
2044   $proc{$srunpid} = {
2045     jobstepidx => $jobstepidx,
2046   };
2047   $reader{$jobstepidx} = $stderr_r;
2048
2049   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2050     my $busy = readfrompipes();
2051     if (!$busy || ($latest_refresh + 2 < scalar time)) {
2052       check_refresh_wanted();
2053       check_squeue();
2054       check_sinfo();
2055     }
2056     if (!$busy) {
2057       select(undef, undef, undef, 0.1);
2058     }
2059     if (($started_srun + $srun_sync_timeout) < scalar time) {
2060       # Exceeded general timeout for "srun_sync" operations, likely
2061       # means something got stuck on the remote node.
2062       Log(undef, "srun_sync exceeded timeout, will fail.");
2063       $main::please_freeze = 1;
2064     }
2065     killem(keys %proc) if $main::please_freeze;
2066   }
2067   my $exited = $?;
2068
2069   readfrompipes_after_exit ($jobstepidx);
2070
2071   Log (undef, "$label: exit ".exit_status_s($exited));
2072
2073   close($stdout_r);
2074   close($stderr_r);
2075   delete $proc{$srunpid};
2076   delete $reader{$jobstepidx};
2077
2078   my $j = pop @jobstep;
2079   # If the srun showed signs of tempfail, ensure the caller treats that as a
2080   # failure case.
2081   if ($main::please_freeze || $j->{tempfail}) {
2082     $exited ||= 255;
2083   }
2084   return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
2085 }
2086
2087
2088 sub srun
2089 {
2090   my $srunargs = shift;
2091   my $execargs = shift;
2092   my $opts = shift || {};
2093   my $stdin = shift;
2094   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2095
2096   $Data::Dumper::Terse = 1;
2097   $Data::Dumper::Indent = 0;
2098   my $show_cmd = Dumper($args);
2099   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2100   $show_cmd =~ s/\n/ /g;
2101   if ($opts->{fork}) {
2102     Log(undef, "starting: $show_cmd");
2103   } else {
2104     # This is a child process: parent is in charge of reading our
2105     # stderr and copying it to Log() if needed.
2106     warn "starting: $show_cmd\n";
2107   }
2108
2109   if (defined $stdin) {
2110     my $child = open STDIN, "-|";
2111     defined $child or die "no fork: $!";
2112     if ($child == 0) {
2113       print $stdin or die $!;
2114       close STDOUT or die $!;
2115       exit 0;
2116     }
2117   }
2118
2119   return system (@$args) if $opts->{fork};
2120
2121   exec @$args;
2122   warn "ENV size is ".length(join(" ",%ENV));
2123   die "exec failed: $!: @$args";
2124 }
2125
2126
2127 sub ban_node_by_slot {
2128   # Don't start any new jobsteps on this node for 60 seconds
2129   my $slotid = shift;
2130   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2131   $slot[$slotid]->{node}->{hold_count}++;
2132   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2133 }
2134
2135 sub must_lock_now
2136 {
2137   my ($lockfile, $error_message) = @_;
2138   open L, ">", $lockfile or croak("$lockfile: $!");
2139   if (!flock L, LOCK_EX|LOCK_NB) {
2140     croak("Can't lock $lockfile: $error_message\n");
2141   }
2142 }
2143
2144 sub find_docker_image {
2145   # Given a Keep locator, check to see if it contains a Docker image.
2146   # If so, return its stream name and Docker hash.
2147   # If not, return undef for both values.
2148   my $locator = shift;
2149   my ($streamname, $filename);
2150   my $image = api_call("collections/get", uuid => $locator);
2151   if ($image) {
2152     foreach my $line (split(/\n/, $image->{manifest_text})) {
2153       my @tokens = split(/\s+/, $line);
2154       next if (!@tokens);
2155       $streamname = shift(@tokens);
2156       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2157         if (defined($filename)) {
2158           return (undef, undef);  # More than one file in the Collection.
2159         } else {
2160           $filename = (split(/:/, $filedata, 3))[2];
2161           $filename =~ s/\\([0-3][0-7][0-7])/chr(oct($1))/ge;
2162         }
2163       }
2164     }
2165   }
2166   if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2167     return ($streamname, $1);
2168   } else {
2169     return (undef, undef);
2170   }
2171 }
2172
2173 sub exit_retry_unlocked {
2174   Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
2175   exit(EX_RETRY_UNLOCKED);
2176 }
2177
2178 sub retry_count {
2179   # Calculate the number of times an operation should be retried,
2180   # assuming exponential backoff, and that we're willing to retry as
2181   # long as tasks have been running.  Enforce a minimum of 3 retries.
2182   my ($starttime, $endtime, $timediff, $retries);
2183   if (@jobstep) {
2184     $starttime = $jobstep[0]->{starttime};
2185     $endtime = $jobstep[-1]->{finishtime};
2186   }
2187   if (!defined($starttime)) {
2188     $timediff = 0;
2189   } elsif (!defined($endtime)) {
2190     $timediff = time - $starttime;
2191   } else {
2192     $timediff = ($endtime - $starttime) - (time - $endtime);
2193   }
2194   if ($timediff > 0) {
2195     $retries = int(log($timediff) / log(2));
2196   } else {
2197     $retries = 1;  # Use the minimum.
2198   }
2199   return ($retries > 3) ? $retries : 3;
2200 }
2201
2202 sub retry_op {
2203   # Pass in two function references.
2204   # This method will be called with the remaining arguments.
2205   # If it dies, retry it with exponential backoff until it succeeds,
2206   # or until the current retry_count is exhausted.  After each failure
2207   # that can be retried, the second function will be called with
2208   # the current try count (0-based), next try time, and error message.
2209   my $operation = shift;
2210   my $op_text = shift;
2211   my $retries = retry_count();
2212   my $retry_callback = sub {
2213     my ($try_count, $next_try_at, $errmsg) = @_;
2214     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2215     $errmsg =~ s/\s/ /g;
2216     $errmsg =~ s/\s+$//;
2217     my $retry_msg;
2218     if ($next_try_at < time) {
2219       $retry_msg = "Retrying.";
2220     } else {
2221       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2222       $retry_msg = "Retrying at $next_try_fmt.";
2223     }
2224     Log(undef, "$op_text failed: $errmsg. $retry_msg");
2225   };
2226   foreach my $try_count (0..$retries) {
2227     my $next_try = time + (2 ** $try_count);
2228     my $result = eval { $operation->(@_); };
2229     if (!$@) {
2230       return $result;
2231     } elsif ($try_count < $retries) {
2232       $retry_callback->($try_count, $next_try, $@);
2233       my $sleep_time = $next_try - time;
2234       sleep($sleep_time) if ($sleep_time > 0);
2235     }
2236   }
2237   # Ensure the error message ends in a newline, so Perl doesn't add
2238   # retry_op's line number to it.
2239   chomp($@);
2240   die($@ . "\n");
2241 }
2242
2243 sub api_call {
2244   # Pass in a /-separated API method name, and arguments for it.
2245   # This function will call that method, retrying as needed until
2246   # the current retry_count is exhausted, with a log on the first failure.
2247   my $method_name = shift;
2248   my $method = $arv;
2249   foreach my $key (split(/\//, $method_name)) {
2250     $method = $method->{$key};
2251   }
2252   return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
2253 }
2254
2255 sub exit_status_s {
2256   # Given a $?, return a human-readable exit code string like "0" or
2257   # "1" or "0 with signal 1" or "1 with signal 11".
2258   my $exitcode = shift;
2259   my $s = $exitcode >> 8;
2260   if ($exitcode & 0x7f) {
2261     $s .= " with signal " . ($exitcode & 0x7f);
2262   }
2263   if ($exitcode & 0x80) {
2264     $s .= " with core dump";
2265   }
2266   return $s;
2267 }
2268
2269 sub handle_readall {
2270   # Pass in a glob reference to a file handle.
2271   # Read all its contents and return them as a string.
2272   my $fh_glob_ref = shift;
2273   local $/ = undef;
2274   return <$fh_glob_ref>;
2275 }
2276
2277 sub tar_filename_n {
2278   my $n = shift;
2279   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2280 }
2281
2282 sub add_git_archive {
2283   # Pass in a git archive command as a string or list, a la system().
2284   # This method will save its output to be included in the archive sent to the
2285   # build script.
2286   my $git_input;
2287   $git_tar_count++;
2288   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2289     croak("Failed to save git archive: $!");
2290   }
2291   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2292   close($git_input);
2293   waitpid($git_pid, 0);
2294   close(GIT_ARCHIVE);
2295   if ($?) {
2296     croak("Failed to save git archive: git exited " . exit_status_s($?));
2297   }
2298 }
2299
2300 sub combined_git_archive {
2301   # Combine all saved tar archives into a single archive, then return its
2302   # contents in a string.  Return undef if no archives have been saved.
2303   if ($git_tar_count < 1) {
2304     return undef;
2305   }
2306   my $base_tar_name = tar_filename_n(1);
2307   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2308     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2309     if ($tar_exit != 0) {
2310       croak("Error preparing build archive: tar -A exited " .
2311             exit_status_s($tar_exit));
2312     }
2313   }
2314   if (!open(GIT_TAR, "<", $base_tar_name)) {
2315     croak("Could not open build archive: $!");
2316   }
2317   my $tar_contents = handle_readall(\*GIT_TAR);
2318   close(GIT_TAR);
2319   return $tar_contents;
2320 }
2321
2322 sub set_nonblocking {
2323   my $fh = shift;
2324   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2325   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2326 }
2327
2328 __DATA__
2329 #!/usr/bin/env perl
2330 #
2331 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2332 # server invokes this script on individual compute nodes, or localhost if we're
2333 # running a job locally.  It gets called in two modes:
2334 #
2335 # * No arguments: Installation mode.  Read a tar archive from the DATA
2336 #   file handle; it includes the Crunch script's source code, and
2337 #   maybe SDKs as well.  Those should be installed in the proper
2338 #   locations.  This runs outside of any Docker container, so don't try to
2339 #   introspect Crunch's runtime environment.
2340 #
2341 # * With arguments: Crunch script run mode.  This script should set up the
2342 #   environment, then run the command specified in the arguments.  This runs
2343 #   inside any Docker container.
2344
2345 use Fcntl ':flock';
2346 use File::Path qw( make_path remove_tree );
2347 use POSIX qw(getcwd);
2348
2349 use constant TASK_TEMPFAIL => 111;
2350
2351 # Map SDK subdirectories to the path environments they belong to.
2352 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2353
2354 my $destdir = $ENV{"CRUNCH_SRC"};
2355 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2356 my $repo = $ENV{"CRUNCH_SRC_URL"};
2357 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2358 my $job_work = $ENV{"JOB_WORK"};
2359 my $task_work = $ENV{"TASK_WORK"};
2360
2361 open(STDOUT_ORIG, ">&", STDOUT);
2362 open(STDERR_ORIG, ">&", STDERR);
2363
2364 for my $dir ($destdir, $job_work, $task_work) {
2365   if ($dir) {
2366     make_path $dir;
2367     -e $dir or die "Failed to create temporary directory ($dir): $!";
2368   }
2369 }
2370
2371 if ($task_work) {
2372   remove_tree($task_work, {keep_root => 1});
2373 }
2374
2375 ### Crunch script run mode
2376 if (@ARGV) {
2377   # We want to do routine logging during task 0 only.  This gives the user
2378   # the information they need, but avoids repeating the information for every
2379   # task.
2380   my $Log;
2381   if ($ENV{TASK_SEQUENCE} eq "0") {
2382     $Log = sub {
2383       my $msg = shift;
2384       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2385     };
2386   } else {
2387     $Log = sub { };
2388   }
2389
2390   my $python_src = "$install_dir/python";
2391   my $venv_dir = "$job_work/.arvados.venv";
2392   my $venv_built = -e "$venv_dir/bin/activate";
2393   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2394     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2395                  "--python=python2.7", $venv_dir);
2396     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2397     $venv_built = 1;
2398     $Log->("Built Python SDK virtualenv");
2399   }
2400
2401   my @pysdk_version_cmd = ("python", "-c",
2402     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2403   if ($venv_built) {
2404     $Log->("Running in Python SDK virtualenv");
2405     @pysdk_version_cmd = ();
2406     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2407     @ARGV = ("/bin/sh", "-ec",
2408              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2409   } elsif (-d $python_src) {
2410     $Log->("Warning: virtualenv not found inside Docker container default " .
2411            "\$PATH. Can't install Python SDK.");
2412   }
2413
2414   if (@pysdk_version_cmd) {
2415     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2416     my $pysdk_version = <$pysdk_version_pipe>;
2417     close($pysdk_version_pipe);
2418     if ($? == 0) {
2419       chomp($pysdk_version);
2420       $Log->("Using Arvados SDK version $pysdk_version");
2421     } else {
2422       # A lot could've gone wrong here, but pretty much all of it means that
2423       # Python won't be able to load the Arvados SDK.
2424       $Log->("Warning: Arvados SDK not found");
2425     }
2426   }
2427
2428   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2429     my $sdk_path = "$install_dir/$sdk_dir";
2430     if (-d $sdk_path) {
2431       if ($ENV{$sdk_envkey}) {
2432         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2433       } else {
2434         $ENV{$sdk_envkey} = $sdk_path;
2435       }
2436       $Log->("Arvados SDK added to %s", $sdk_envkey);
2437     }
2438   }
2439
2440   exec(@ARGV);
2441   die "Cannot exec `@ARGV`: $!";
2442 }
2443
2444 ### Installation mode
2445 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2446 flock L, LOCK_EX;
2447 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2448   # This exact git archive (source + arvados sdk) is already installed
2449   # here, so there's no need to reinstall it.
2450
2451   # We must consume our DATA section, though: otherwise the process
2452   # feeding it to us will get SIGPIPE.
2453   my $buf;
2454   while (read(DATA, $buf, 65536)) { }
2455
2456   exit(0);
2457 }
2458
2459 unlink "$destdir.archive_hash";
2460 mkdir $destdir;
2461
2462 do {
2463   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2464   local $SIG{PIPE} = "IGNORE";
2465   warn "Extracting archive: $archive_hash\n";
2466   # --ignore-zeros is necessary sometimes: depending on how much NUL
2467   # padding tar -A put on our combined archive (which in turn depends
2468   # on the length of the component archives) tar without
2469   # --ignore-zeros will exit before consuming stdin and cause close()
2470   # to fail on the resulting SIGPIPE.
2471   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2472     die "Error launching 'tar -xC $destdir': $!";
2473   }
2474   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2475   # get SIGPIPE.  We must feed it data incrementally.
2476   my $tar_input;
2477   while (read(DATA, $tar_input, 65536)) {
2478     print TARX $tar_input;
2479   }
2480   if(!close(TARX)) {
2481     die "'tar -xC $destdir' exited $?: $!";
2482   }
2483 };
2484
2485 mkdir $install_dir;
2486
2487 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2488 if (-d $sdk_root) {
2489   foreach my $sdk_lang (("python",
2490                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2491     if (-d "$sdk_root/$sdk_lang") {
2492       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2493         die "Failed to install $sdk_lang SDK: $!";
2494       }
2495     }
2496   }
2497 }
2498
2499 my $python_dir = "$install_dir/python";
2500 if ((-d $python_dir) and can_run("python2.7")) {
2501   open(my $egg_info_pipe, "-|",
2502        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2503   my @egg_info_errors = <$egg_info_pipe>;
2504   close($egg_info_pipe);
2505
2506   if ($?) {
2507     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2508       # egg_info apparently failed because it couldn't ask git for a build tag.
2509       # Specify no build tag.
2510       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2511       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2512       close($pysdk_cfg);
2513     } else {
2514       my $egg_info_exit = $? >> 8;
2515       foreach my $errline (@egg_info_errors) {
2516         warn $errline;
2517       }
2518       warn "python setup.py egg_info failed: exit $egg_info_exit";
2519       exit ($egg_info_exit || 1);
2520     }
2521   }
2522 }
2523
2524 # Hide messages from the install script (unless it fails: shell_or_die
2525 # will show $destdir.log in that case).
2526 open(STDOUT, ">>", "$destdir.log") or die ($!);
2527 open(STDERR, ">&", STDOUT) or die ($!);
2528
2529 if (-e "$destdir/crunch_scripts/install") {
2530     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2531 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2532     # Old version
2533     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2534 } elsif (-e "./install.sh") {
2535     shell_or_die (undef, "./install.sh", $install_dir);
2536 }
2537
2538 if ($archive_hash) {
2539     unlink "$destdir.archive_hash.new";
2540     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2541     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2542 }
2543
2544 close L;
2545
2546 sub can_run {
2547   my $command_name = shift;
2548   open(my $which, "-|", "which", $command_name) or die ($!);
2549   while (<$which>) { }
2550   close($which);
2551   return ($? == 0);
2552 }
2553
2554 sub shell_or_die
2555 {
2556   my $exitcode = shift;
2557
2558   if ($ENV{"DEBUG"}) {
2559     print STDERR "@_\n";
2560   }
2561   if (system (@_) != 0) {
2562     my $err = $!;
2563     my $code = $?;
2564     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2565     open STDERR, ">&STDERR_ORIG";
2566     system ("cat $destdir.log >&2");
2567     warn "@_ failed ($err): $exitstatus";
2568     if (defined($exitcode)) {
2569       exit $exitcode;
2570     }
2571     else {
2572       exit (($code >> 8) || 1);
2573     }
2574   }
2575 }
2576
2577 __DATA__