5642: Explicitly make all swap available under Docker in crunch-job.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my $force_unlock;
122 my $git_dir;
123 my $jobspec;
124 my $job_api_token;
125 my $no_clear_tmp;
126 my $resume_stash;
127 GetOptions('force-unlock' => \$force_unlock,
128            'git-dir=s' => \$git_dir,
129            'job=s' => \$jobspec,
130            'job-api-token=s' => \$job_api_token,
131            'no-clear-tmp' => \$no_clear_tmp,
132            'resume-stash=s' => \$resume_stash,
133     );
134
135 if (defined $job_api_token) {
136   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 }
138
139 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
140 my $local_job = 0;
141
142
143 $SIG{'USR1'} = sub
144 {
145   $main::ENV{CRUNCH_DEBUG} = 1;
146 };
147 $SIG{'USR2'} = sub
148 {
149   $main::ENV{CRUNCH_DEBUG} = 0;
150 };
151
152
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $User = api_call("users/current");
163
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   if (!$force_unlock) {
169     # Claim this job, and make sure nobody else does
170     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
171     if ($@) {
172       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
173       exit EX_TEMPFAIL;
174     };
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189   $Job->{'state'} = 'Running';
190
191   $Job = api_call("jobs/create", job => $Job);
192 }
193 $job_id = $Job->{'uuid'};
194
195 my $keep_logfile = $job_id . '.log.txt';
196 log_writer_start($keep_logfile);
197
198 $Job->{'runtime_constraints'} ||= {};
199 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
200 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201
202 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
203 if ($? == 0) {
204   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
205   chomp($gem_versions);
206   chop($gem_versions);  # Closing parentheses
207 } else {
208   $gem_versions = "";
209 }
210 Log(undef,
211     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
212
213 Log (undef, "check slurm allocation");
214 my @slot;
215 my @node;
216 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
217 my @sinfo;
218 if (!$have_slurm)
219 {
220   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
221   push @sinfo, "$localcpus localhost";
222 }
223 if (exists $ENV{SLURM_NODELIST})
224 {
225   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
226 }
227 foreach (@sinfo)
228 {
229   my ($ncpus, $slurm_nodelist) = split;
230   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
231
232   my @nodelist;
233   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
234   {
235     my $nodelist = $1;
236     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
237     {
238       my $ranges = $1;
239       foreach (split (",", $ranges))
240       {
241         my ($a, $b);
242         if (/(\d+)-(\d+)/)
243         {
244           $a = $1;
245           $b = $2;
246         }
247         else
248         {
249           $a = $_;
250           $b = $_;
251         }
252         push @nodelist, map {
253           my $n = $nodelist;
254           $n =~ s/\[[-,\d]+\]/$_/;
255           $n;
256         } ($a..$b);
257       }
258     }
259     else
260     {
261       push @nodelist, $nodelist;
262     }
263   }
264   foreach my $nodename (@nodelist)
265   {
266     Log (undef, "node $nodename - $ncpus slots");
267     my $node = { name => $nodename,
268                  ncpus => $ncpus,
269                  losing_streak => 0,
270                  hold_until => 0 };
271     foreach my $cpu (1..$ncpus)
272     {
273       push @slot, { node => $node,
274                     cpu => $cpu };
275     }
276   }
277   push @node, @nodelist;
278 }
279
280
281
282 # Ensure that we get one jobstep running on each allocated node before
283 # we start overloading nodes with concurrent steps
284
285 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
286
287
288 $Job->update_attributes(
289   'tasks_summary' => { 'failed' => 0,
290                        'todo' => 1,
291                        'running' => 0,
292                        'done' => 0 });
293
294 Log (undef, "start");
295 $SIG{'INT'} = sub { $main::please_freeze = 1; };
296 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
297 $SIG{'TERM'} = \&croak;
298 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
299 $SIG{'ALRM'} = sub { $main::please_info = 1; };
300 $SIG{'CONT'} = sub { $main::please_continue = 1; };
301 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
302
303 $main::please_freeze = 0;
304 $main::please_info = 0;
305 $main::please_continue = 0;
306 $main::please_refresh = 0;
307 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
308
309 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
310 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
311 $ENV{"JOB_UUID"} = $job_id;
312
313
314 my @jobstep_todo = ();
315 my @jobstep_done = ();
316 my @jobstep_tomerge = ();
317 my $jobstep_tomerge_level = 0;
318 my $squeue_checked;
319 my $squeue_kill_checked;
320 my $latest_refresh = scalar time;
321
322
323
324 if (defined $Job->{thawedfromkey})
325 {
326   thaw ($Job->{thawedfromkey});
327 }
328 else
329 {
330   my $first_task = api_call("job_tasks/create", job_task => {
331     'job_uuid' => $Job->{'uuid'},
332     'sequence' => 0,
333     'qsequence' => 0,
334     'parameters' => {},
335   });
336   push @jobstep, { 'level' => 0,
337                    'failures' => 0,
338                    'arvados_task' => $first_task,
339                  };
340   push @jobstep_todo, 0;
341 }
342
343
344 if (!$have_slurm)
345 {
346   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
347 }
348
349 my $build_script = handle_readall(\*DATA);
350 my $nodelist = join(",", @node);
351 my $git_tar_count = 0;
352
353 if (!defined $no_clear_tmp) {
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355   Log (undef, "Clean work dirs");
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     # Find FUSE mounts that look like Keep mounts (the mount path has the
361     # word "keep") and unmount them.  Then clean up work directories.
362     # TODO: When #5036 is done and widely deployed, we can get rid of the
363     # regular expression and just unmount everything with type fuse.keep.
364     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
365           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
366     exit (1);
367   }
368   while (1)
369   {
370     last if $cleanpid == waitpid (-1, WNOHANG);
371     freeze_if_want_freeze ($cleanpid);
372     select (undef, undef, undef, 0.1);
373   }
374   Log (undef, "Cleanup command exited ".exit_status_s($?));
375 }
376
377 # If this job requires a Docker image, install that.
378 my $docker_bin = "/usr/bin/docker.io";
379 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
380 if ($docker_locator = $Job->{docker_image_locator}) {
381   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
382   if (!$docker_hash)
383   {
384     croak("No Docker image hash found from locator $docker_locator");
385   }
386   $docker_stream =~ s/^\.//;
387   my $docker_install_script = qq{
388 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
389     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
390 fi
391 };
392   my $docker_pid = fork();
393   if ($docker_pid == 0)
394   {
395     srun (["srun", "--nodelist=" . join(',', @node)],
396           ["/bin/sh", "-ec", $docker_install_script]);
397     exit ($?);
398   }
399   while (1)
400   {
401     last if $docker_pid == waitpid (-1, WNOHANG);
402     freeze_if_want_freeze ($docker_pid);
403     select (undef, undef, undef, 0.1);
404   }
405   if ($? != 0)
406   {
407     croak("Installing Docker image from $docker_locator exited "
408           .exit_status_s($?));
409   }
410
411   # Determine whether this version of Docker supports memory+swap limits.
412   srun(["srun", "--nodelist=" . $node[0]],
413        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
414       {fork => 1});
415   $docker_limitmem = ($? == 0);
416
417   if ($Job->{arvados_sdk_version}) {
418     # The job also specifies an Arvados SDK version.  Add the SDKs to the
419     # tar file for the build script to install.
420     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
421                        $Job->{arvados_sdk_version}));
422     add_git_archive("git", "--git-dir=$git_dir", "archive",
423                     "--prefix=.arvados.sdk/",
424                     $Job->{arvados_sdk_version}, "sdk");
425   }
426 }
427
428 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
429   # If script_version looks like an absolute path, *and* the --git-dir
430   # argument was not given -- which implies we were not invoked by
431   # crunch-dispatch -- we will use the given path as a working
432   # directory instead of resolving script_version to a git commit (or
433   # doing anything else with git).
434   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
435   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
436 }
437 else {
438   # Resolve the given script_version to a git commit sha1. Also, if
439   # the repository is remote, clone it into our local filesystem: this
440   # ensures "git archive" will work, and is necessary to reliably
441   # resolve a symbolic script_version like "master^".
442   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
443
444   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
445
446   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
447
448   # If we're running under crunch-dispatch, it will have already
449   # pulled the appropriate source tree into its own repository, and
450   # given us that repo's path as $git_dir.
451   #
452   # If we're running a "local" job, we might have to fetch content
453   # from a remote repository.
454   #
455   # (Currently crunch-dispatch gives a local path with --git-dir, but
456   # we might as well accept URLs there too in case it changes its
457   # mind.)
458   my $repo = $git_dir || $Job->{'repository'};
459
460   # Repository can be remote or local. If remote, we'll need to fetch it
461   # to a local dir before doing `git log` et al.
462   my $repo_location;
463
464   if ($repo =~ m{://|^[^/]*:}) {
465     # $repo is a git url we can clone, like git:// or https:// or
466     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
467     # not recognized here because distinguishing that from a local
468     # path is too fragile. If you really need something strange here,
469     # use the ssh:// form.
470     $repo_location = 'remote';
471   } elsif ($repo =~ m{^\.*/}) {
472     # $repo is a local path to a git index. We'll also resolve ../foo
473     # to ../foo/.git if the latter is a directory. To help
474     # disambiguate local paths from named hosted repositories, this
475     # form must be given as ./ or ../ if it's a relative path.
476     if (-d "$repo/.git") {
477       $repo = "$repo/.git";
478     }
479     $repo_location = 'local';
480   } else {
481     # $repo is none of the above. It must be the name of a hosted
482     # repository.
483     my $arv_repo_list = api_call("repositories/list",
484                                  'filters' => [['name','=',$repo]]);
485     my @repos_found = @{$arv_repo_list->{'items'}};
486     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
487     if ($n_found > 0) {
488       Log(undef, "Repository '$repo' -> "
489           . join(", ", map { $_->{'uuid'} } @repos_found));
490     }
491     if ($n_found != 1) {
492       croak("Error: Found $n_found repositories with name '$repo'.");
493     }
494     $repo = $repos_found[0]->{'fetch_url'};
495     $repo_location = 'remote';
496   }
497   Log(undef, "Using $repo_location repository '$repo'");
498   $ENV{"CRUNCH_SRC_URL"} = $repo;
499
500   # Resolve given script_version (we'll call that $treeish here) to a
501   # commit sha1 ($commit).
502   my $treeish = $Job->{'script_version'};
503   my $commit;
504   if ($repo_location eq 'remote') {
505     # We minimize excess object-fetching by re-using the same bare
506     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
507     # just keep adding remotes to it as needed.
508     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
509     my $gitcmd = "git --git-dir=\Q$local_repo\E";
510
511     # Set up our local repo for caching remote objects, making
512     # archives, etc.
513     if (!-d $local_repo) {
514       make_path($local_repo) or croak("Error: could not create $local_repo");
515     }
516     # This works (exits 0 and doesn't delete fetched objects) even
517     # if $local_repo is already initialized:
518     `$gitcmd init --bare`;
519     if ($?) {
520       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
521     }
522
523     # If $treeish looks like a hash (or abbrev hash) we look it up in
524     # our local cache first, since that's cheaper. (We don't want to
525     # do that with tags/branches though -- those change over time, so
526     # they should always be resolved by the remote repo.)
527     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
528       # Hide stderr because it's normal for this to fail:
529       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
530       if ($? == 0 &&
531           # Careful not to resolve a branch named abcdeff to commit 1234567:
532           $sha1 =~ /^$treeish/ &&
533           $sha1 =~ /^([0-9a-f]{40})$/s) {
534         $commit = $1;
535         Log(undef, "Commit $commit already present in $local_repo");
536       }
537     }
538
539     if (!defined $commit) {
540       # If $treeish isn't just a hash or abbrev hash, or isn't here
541       # yet, we need to fetch the remote to resolve it correctly.
542
543       # First, remove all local heads. This prevents a name that does
544       # not exist on the remote from resolving to (or colliding with)
545       # a previously fetched branch or tag (possibly from a different
546       # remote).
547       remove_tree("$local_repo/refs/heads", {keep_root => 1});
548
549       Log(undef, "Fetching objects from $repo to $local_repo");
550       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
551       if ($?) {
552         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
553       }
554     }
555
556     # Now that the data is all here, we will use our local repo for
557     # the rest of our git activities.
558     $repo = $local_repo;
559   }
560
561   my $gitcmd = "git --git-dir=\Q$repo\E";
562   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
563   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
564     croak("`$gitcmd rev-list` exited "
565           .exit_status_s($?)
566           .", '$treeish' not found. Giving up.");
567   }
568   $commit = $1;
569   Log(undef, "Version $treeish is commit $commit");
570
571   if ($commit ne $Job->{'script_version'}) {
572     # Record the real commit id in the database, frozentokey, logs,
573     # etc. -- instead of an abbreviation or a branch name which can
574     # become ambiguous or point to a different commit in the future.
575     if (!$Job->update_attributes('script_version' => $commit)) {
576       croak("Error: failed to update job's script_version attribute");
577     }
578   }
579
580   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
581   add_git_archive("$gitcmd archive ''\Q$commit\E");
582 }
583
584 my $git_archive = combined_git_archive();
585 if (!defined $git_archive) {
586   Log(undef, "Skip install phase (no git archive)");
587   if ($have_slurm) {
588     Log(undef, "Warning: This probably means workers have no source tree!");
589   }
590 }
591 else {
592   Log(undef, "Run install script on all workers");
593
594   my @srunargs = ("srun",
595                   "--nodelist=$nodelist",
596                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
597   my @execargs = ("sh", "-c",
598                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
599
600   my $installpid = fork();
601   if ($installpid == 0)
602   {
603     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
604     exit (1);
605   }
606   while (1)
607   {
608     last if $installpid == waitpid (-1, WNOHANG);
609     freeze_if_want_freeze ($installpid);
610     select (undef, undef, undef, 0.1);
611   }
612   my $install_exited = $?;
613   Log (undef, "Install script exited ".exit_status_s($install_exited));
614   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
615     unlink($tar_filename);
616   }
617   exit (1) if $install_exited != 0;
618 }
619
620 foreach (qw (script script_version script_parameters runtime_constraints))
621 {
622   Log (undef,
623        "$_ " .
624        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
625 }
626 foreach (split (/\n/, $Job->{knobs}))
627 {
628   Log (undef, "knob " . $_);
629 }
630
631
632
633 $main::success = undef;
634
635
636
637 ONELEVEL:
638
639 my $thisround_succeeded = 0;
640 my $thisround_failed = 0;
641 my $thisround_failed_multiple = 0;
642
643 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
644                        or $a <=> $b } @jobstep_todo;
645 my $level = $jobstep[$jobstep_todo[0]]->{level};
646 Log (undef, "start level $level");
647
648
649
650 my %proc;
651 my @freeslot = (0..$#slot);
652 my @holdslot;
653 my %reader;
654 my $progress_is_dirty = 1;
655 my $progress_stats_updated = 0;
656
657 update_progress_stats();
658
659
660
661 THISROUND:
662 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
663 {
664   my $id = $jobstep_todo[$todo_ptr];
665   my $Jobstep = $jobstep[$id];
666   if ($Jobstep->{level} != $level)
667   {
668     next;
669   }
670
671   pipe $reader{$id}, "writer" or croak ($!);
672   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
673   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
674
675   my $childslot = $freeslot[0];
676   my $childnode = $slot[$childslot]->{node};
677   my $childslotname = join (".",
678                             $slot[$childslot]->{node}->{name},
679                             $slot[$childslot]->{cpu});
680
681   my $childpid = fork();
682   if ($childpid == 0)
683   {
684     $SIG{'INT'} = 'DEFAULT';
685     $SIG{'QUIT'} = 'DEFAULT';
686     $SIG{'TERM'} = 'DEFAULT';
687
688     foreach (values (%reader))
689     {
690       close($_);
691     }
692     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
693     open(STDOUT,">&writer");
694     open(STDERR,">&writer");
695
696     undef $dbh;
697     undef $sth;
698
699     delete $ENV{"GNUPGHOME"};
700     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
701     $ENV{"TASK_QSEQUENCE"} = $id;
702     $ENV{"TASK_SEQUENCE"} = $level;
703     $ENV{"JOB_SCRIPT"} = $Job->{script};
704     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
705       $param =~ tr/a-z/A-Z/;
706       $ENV{"JOB_PARAMETER_$param"} = $value;
707     }
708     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
709     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
710     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
711     $ENV{"HOME"} = $ENV{"TASK_WORK"};
712     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
713     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
714     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
715     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
716
717     $ENV{"GZIP"} = "-n";
718
719     my @srunargs = (
720       "srun",
721       "--nodelist=".$childnode->{name},
722       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
723       "--job-name=$job_id.$id.$$",
724         );
725     my $command =
726         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
727         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
728         ."&& cd $ENV{CRUNCH_TMP} "
729         # These environment variables get used explicitly later in
730         # $command.  No tool is expected to read these values directly.
731         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
732         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
733         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
734         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
735     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
736     if ($docker_hash)
737     {
738       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
739       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
740       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
741       # We only set memory limits if Docker lets us limit both memory and swap.
742       # Memory limits alone have been supported longer, but subprocesses tend
743       # to get SIGKILL if they exceed that without any swap limit set.
744       # See #5642 for additional background.
745       if ($docker_limitmem) {
746         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
747       }
748
749       # Dynamically configure the container to use the host system as its
750       # DNS server.  Get the host's global addresses from the ip command,
751       # and turn them into docker --dns options using gawk.
752       $command .=
753           q{$(ip -o address show scope global |
754               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
755
756       # The source tree and $destdir directory (which we have
757       # installed on the worker host) are available in the container,
758       # under the same path.
759       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
760       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
761
762       # Currently, we make arv-mount's mount point appear at /keep
763       # inside the container (instead of using the same path as the
764       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
765       # crunch scripts and utilities must not rely on this. They must
766       # use $TASK_KEEPMOUNT.
767       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
768       $ENV{TASK_KEEPMOUNT} = "/keep";
769
770       # TASK_WORK is almost exactly like a docker data volume: it
771       # starts out empty, is writable, and persists until no
772       # containers use it any more. We don't use --volumes-from to
773       # share it with other containers: it is only accessible to this
774       # task, and it goes away when this task stops.
775       #
776       # However, a docker data volume is writable only by root unless
777       # the mount point already happens to exist in the container with
778       # different permissions. Therefore, we [1] assume /tmp already
779       # exists in the image and is writable by the crunch user; [2]
780       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
781       # writable if they are created by docker while setting up the
782       # other --volumes); and [3] create $TASK_WORK inside the
783       # container using $build_script.
784       $command .= "--volume=/tmp ";
785       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
786       $ENV{"HOME"} = $ENV{"TASK_WORK"};
787       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
788
789       # TODO: Share a single JOB_WORK volume across all task
790       # containers on a given worker node, and delete it when the job
791       # ends (and, in case that doesn't work, when the next job
792       # starts).
793       #
794       # For now, use the same approach as TASK_WORK above.
795       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
796
797       while (my ($env_key, $env_val) = each %ENV)
798       {
799         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
800           $command .= "--env=\Q$env_key=$env_val\E ";
801         }
802       }
803       $command .= "--env=\QHOME=$ENV{HOME}\E ";
804       $command .= "\Q$docker_hash\E ";
805       $command .= "stdbuf --output=0 --error=0 ";
806       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
807     } else {
808       # Non-docker run
809       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
810       $command .= "stdbuf --output=0 --error=0 ";
811       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
812     }
813
814     my @execargs = ('bash', '-c', $command);
815     srun (\@srunargs, \@execargs, undef, $build_script);
816     # exec() failed, we assume nothing happened.
817     die "srun() failed on build script\n";
818   }
819   close("writer");
820   if (!defined $childpid)
821   {
822     close $reader{$id};
823     delete $reader{$id};
824     next;
825   }
826   shift @freeslot;
827   $proc{$childpid} = { jobstep => $id,
828                        time => time,
829                        slot => $childslot,
830                        jobstepname => "$job_id.$id.$childpid",
831                      };
832   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
833   $slot[$childslot]->{pid} = $childpid;
834
835   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
836   Log ($id, "child $childpid started on $childslotname");
837   $Jobstep->{starttime} = time;
838   $Jobstep->{node} = $childnode->{name};
839   $Jobstep->{slotindex} = $childslot;
840   delete $Jobstep->{stderr};
841   delete $Jobstep->{finishtime};
842
843   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
844   $Jobstep->{'arvados_task'}->save;
845
846   splice @jobstep_todo, $todo_ptr, 1;
847   --$todo_ptr;
848
849   $progress_is_dirty = 1;
850
851   while (!@freeslot
852          ||
853          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
854   {
855     last THISROUND if $main::please_freeze || defined($main::success);
856     if ($main::please_info)
857     {
858       $main::please_info = 0;
859       freeze();
860       create_output_collection();
861       save_meta(1);
862       update_progress_stats();
863     }
864     my $gotsome
865         = readfrompipes ()
866         + reapchildren ();
867     if (!$gotsome)
868     {
869       check_refresh_wanted();
870       check_squeue();
871       update_progress_stats();
872       select (undef, undef, undef, 0.1);
873     }
874     elsif (time - $progress_stats_updated >= 30)
875     {
876       update_progress_stats();
877     }
878     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
879         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
880     {
881       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
882           .($thisround_failed+$thisround_succeeded)
883           .") -- giving up on this round";
884       Log (undef, $message);
885       last THISROUND;
886     }
887
888     # move slots from freeslot to holdslot (or back to freeslot) if necessary
889     for (my $i=$#freeslot; $i>=0; $i--) {
890       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
891         push @holdslot, (splice @freeslot, $i, 1);
892       }
893     }
894     for (my $i=$#holdslot; $i>=0; $i--) {
895       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
896         push @freeslot, (splice @holdslot, $i, 1);
897       }
898     }
899
900     # give up if no nodes are succeeding
901     if (!grep { $_->{node}->{losing_streak} == 0 &&
902                     $_->{node}->{hold_count} < 4 } @slot) {
903       my $message = "Every node has failed -- giving up on this round";
904       Log (undef, $message);
905       last THISROUND;
906     }
907   }
908 }
909
910
911 push @freeslot, splice @holdslot;
912 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
913
914
915 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
916 while (%proc)
917 {
918   if ($main::please_continue) {
919     $main::please_continue = 0;
920     goto THISROUND;
921   }
922   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
923   readfrompipes ();
924   if (!reapchildren())
925   {
926     check_refresh_wanted();
927     check_squeue();
928     update_progress_stats();
929     select (undef, undef, undef, 0.1);
930     killem (keys %proc) if $main::please_freeze;
931   }
932 }
933
934 update_progress_stats();
935 freeze_if_want_freeze();
936
937
938 if (!defined $main::success)
939 {
940   if (@jobstep_todo &&
941       $thisround_succeeded == 0 &&
942       ($thisround_failed == 0 || $thisround_failed > 4))
943   {
944     my $message = "stop because $thisround_failed tasks failed and none succeeded";
945     Log (undef, $message);
946     $main::success = 0;
947   }
948   if (!@jobstep_todo)
949   {
950     $main::success = 1;
951   }
952 }
953
954 goto ONELEVEL if !defined $main::success;
955
956
957 release_allocation();
958 freeze();
959 my $collated_output = &create_output_collection();
960
961 if (!$collated_output) {
962   Log (undef, "Failed to write output collection");
963 }
964 else {
965   Log(undef, "job output $collated_output");
966   $Job->update_attributes('output' => $collated_output);
967 }
968
969 Log (undef, "finish");
970
971 save_meta();
972
973 my $final_state;
974 if ($collated_output && $main::success) {
975   $final_state = 'Complete';
976 } else {
977   $final_state = 'Failed';
978 }
979 $Job->update_attributes('state' => $final_state);
980
981 exit (($final_state eq 'Complete') ? 0 : 1);
982
983
984
985 sub update_progress_stats
986 {
987   $progress_stats_updated = time;
988   return if !$progress_is_dirty;
989   my ($todo, $done, $running) = (scalar @jobstep_todo,
990                                  scalar @jobstep_done,
991                                  scalar @slot - scalar @freeslot - scalar @holdslot);
992   $Job->{'tasks_summary'} ||= {};
993   $Job->{'tasks_summary'}->{'todo'} = $todo;
994   $Job->{'tasks_summary'}->{'done'} = $done;
995   $Job->{'tasks_summary'}->{'running'} = $running;
996   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
997   Log (undef, "status: $done done, $running running, $todo todo");
998   $progress_is_dirty = 0;
999 }
1000
1001
1002
1003 sub reapchildren
1004 {
1005   my $pid = waitpid (-1, WNOHANG);
1006   return 0 if $pid <= 0;
1007
1008   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1009                   . "."
1010                   . $slot[$proc{$pid}->{slot}]->{cpu});
1011   my $jobstepid = $proc{$pid}->{jobstep};
1012   my $elapsed = time - $proc{$pid}->{time};
1013   my $Jobstep = $jobstep[$jobstepid];
1014
1015   my $childstatus = $?;
1016   my $exitvalue = $childstatus >> 8;
1017   my $exitinfo = "exit ".exit_status_s($childstatus);
1018   $Jobstep->{'arvados_task'}->reload;
1019   my $task_success = $Jobstep->{'arvados_task'}->{success};
1020
1021   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1022
1023   if (!defined $task_success) {
1024     # task did not indicate one way or the other --> fail
1025     $Jobstep->{'arvados_task'}->{success} = 0;
1026     $Jobstep->{'arvados_task'}->save;
1027     $task_success = 0;
1028   }
1029
1030   if (!$task_success)
1031   {
1032     my $temporary_fail;
1033     $temporary_fail ||= $Jobstep->{node_fail};
1034     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1035
1036     ++$thisround_failed;
1037     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1038
1039     # Check for signs of a failed or misconfigured node
1040     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1041         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1042       # Don't count this against jobstep failure thresholds if this
1043       # node is already suspected faulty and srun exited quickly
1044       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1045           $elapsed < 5) {
1046         Log ($jobstepid, "blaming failure on suspect node " .
1047              $slot[$proc{$pid}->{slot}]->{node}->{name});
1048         $temporary_fail ||= 1;
1049       }
1050       ban_node_by_slot($proc{$pid}->{slot});
1051     }
1052
1053     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1054                              ++$Jobstep->{'failures'},
1055                              $temporary_fail ? 'temporary ' : 'permanent',
1056                              $elapsed));
1057
1058     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1059       # Give up on this task, and the whole job
1060       $main::success = 0;
1061     }
1062     # Put this task back on the todo queue
1063     push @jobstep_todo, $jobstepid;
1064     $Job->{'tasks_summary'}->{'failed'}++;
1065   }
1066   else
1067   {
1068     ++$thisround_succeeded;
1069     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1070     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1071     push @jobstep_done, $jobstepid;
1072     Log ($jobstepid, "success in $elapsed seconds");
1073   }
1074   $Jobstep->{exitcode} = $childstatus;
1075   $Jobstep->{finishtime} = time;
1076   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1077   $Jobstep->{'arvados_task'}->save;
1078   process_stderr ($jobstepid, $task_success);
1079   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1080                            length($Jobstep->{'arvados_task'}->{output}),
1081                            $Jobstep->{'arvados_task'}->{output}));
1082
1083   close $reader{$jobstepid};
1084   delete $reader{$jobstepid};
1085   delete $slot[$proc{$pid}->{slot}]->{pid};
1086   push @freeslot, $proc{$pid}->{slot};
1087   delete $proc{$pid};
1088
1089   if ($task_success) {
1090     # Load new tasks
1091     my $newtask_list = [];
1092     my $newtask_results;
1093     do {
1094       $newtask_results = api_call(
1095         "job_tasks/list",
1096         'where' => {
1097           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1098         },
1099         'order' => 'qsequence',
1100         'offset' => scalar(@$newtask_list),
1101       );
1102       push(@$newtask_list, @{$newtask_results->{items}});
1103     } while (@{$newtask_results->{items}});
1104     foreach my $arvados_task (@$newtask_list) {
1105       my $jobstep = {
1106         'level' => $arvados_task->{'sequence'},
1107         'failures' => 0,
1108         'arvados_task' => $arvados_task
1109       };
1110       push @jobstep, $jobstep;
1111       push @jobstep_todo, $#jobstep;
1112     }
1113   }
1114
1115   $progress_is_dirty = 1;
1116   1;
1117 }
1118
1119 sub check_refresh_wanted
1120 {
1121   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1122   if (@stat && $stat[9] > $latest_refresh) {
1123     $latest_refresh = scalar time;
1124     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1125     for my $attr ('cancelled_at',
1126                   'cancelled_by_user_uuid',
1127                   'cancelled_by_client_uuid',
1128                   'state') {
1129       $Job->{$attr} = $Job2->{$attr};
1130     }
1131     if ($Job->{'state'} ne "Running") {
1132       if ($Job->{'state'} eq "Cancelled") {
1133         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1134       } else {
1135         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1136       }
1137       $main::success = 0;
1138       $main::please_freeze = 1;
1139     }
1140   }
1141 }
1142
1143 sub check_squeue
1144 {
1145   # return if the kill list was checked <4 seconds ago
1146   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1147   {
1148     return;
1149   }
1150   $squeue_kill_checked = time;
1151
1152   # use killem() on procs whose killtime is reached
1153   for (keys %proc)
1154   {
1155     if (exists $proc{$_}->{killtime}
1156         && $proc{$_}->{killtime} <= time)
1157     {
1158       killem ($_);
1159     }
1160   }
1161
1162   # return if the squeue was checked <60 seconds ago
1163   if (defined $squeue_checked && $squeue_checked > time - 60)
1164   {
1165     return;
1166   }
1167   $squeue_checked = time;
1168
1169   if (!$have_slurm)
1170   {
1171     # here is an opportunity to check for mysterious problems with local procs
1172     return;
1173   }
1174
1175   # get a list of steps still running
1176   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1177   chop @squeue;
1178   if ($squeue[-1] ne "ok")
1179   {
1180     return;
1181   }
1182   pop @squeue;
1183
1184   # which of my jobsteps are running, according to squeue?
1185   my %ok;
1186   foreach (@squeue)
1187   {
1188     if (/^(\d+)\.(\d+) (\S+)/)
1189     {
1190       if ($1 eq $ENV{SLURM_JOBID})
1191       {
1192         $ok{$3} = 1;
1193       }
1194     }
1195   }
1196
1197   # which of my active child procs (>60s old) were not mentioned by squeue?
1198   foreach (keys %proc)
1199   {
1200     if ($proc{$_}->{time} < time - 60
1201         && !exists $ok{$proc{$_}->{jobstepname}}
1202         && !exists $proc{$_}->{killtime})
1203     {
1204       # kill this proc if it hasn't exited in 30 seconds
1205       $proc{$_}->{killtime} = time + 30;
1206     }
1207   }
1208 }
1209
1210
1211 sub release_allocation
1212 {
1213   if ($have_slurm)
1214   {
1215     Log (undef, "release job allocation");
1216     system "scancel $ENV{SLURM_JOBID}";
1217   }
1218 }
1219
1220
1221 sub readfrompipes
1222 {
1223   my $gotsome = 0;
1224   foreach my $job (keys %reader)
1225   {
1226     my $buf;
1227     while (0 < sysread ($reader{$job}, $buf, 8192))
1228     {
1229       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1230       $jobstep[$job]->{stderr} .= $buf;
1231       preprocess_stderr ($job);
1232       if (length ($jobstep[$job]->{stderr}) > 16384)
1233       {
1234         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1235       }
1236       $gotsome = 1;
1237     }
1238   }
1239   return $gotsome;
1240 }
1241
1242
1243 sub preprocess_stderr
1244 {
1245   my $job = shift;
1246
1247   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1248     my $line = $1;
1249     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1250     Log ($job, "stderr $line");
1251     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1252       # whoa.
1253       $main::please_freeze = 1;
1254     }
1255     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1256       $jobstep[$job]->{node_fail} = 1;
1257       ban_node_by_slot($jobstep[$job]->{slotindex});
1258     }
1259   }
1260 }
1261
1262
1263 sub process_stderr
1264 {
1265   my $job = shift;
1266   my $task_success = shift;
1267   preprocess_stderr ($job);
1268
1269   map {
1270     Log ($job, "stderr $_");
1271   } split ("\n", $jobstep[$job]->{stderr});
1272 }
1273
1274 sub fetch_block
1275 {
1276   my $hash = shift;
1277   my $keep;
1278   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1279     Log(undef, "fetch_block run error from arv-get $hash: $!");
1280     return undef;
1281   }
1282   my $output_block = "";
1283   while (1) {
1284     my $buf;
1285     my $bytes = sysread($keep, $buf, 1024 * 1024);
1286     if (!defined $bytes) {
1287       Log(undef, "fetch_block read error from arv-get: $!");
1288       $output_block = undef;
1289       last;
1290     } elsif ($bytes == 0) {
1291       # sysread returns 0 at the end of the pipe.
1292       last;
1293     } else {
1294       # some bytes were read into buf.
1295       $output_block .= $buf;
1296     }
1297   }
1298   close $keep;
1299   if ($?) {
1300     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1301     $output_block = undef;
1302   }
1303   return $output_block;
1304 }
1305
1306 # Create a collection by concatenating the output of all tasks (each
1307 # task's output is either a manifest fragment, a locator for a
1308 # manifest fragment stored in Keep, or nothing at all). Return the
1309 # portable_data_hash of the new collection.
1310 sub create_output_collection
1311 {
1312   Log (undef, "collate");
1313
1314   my ($child_out, $child_in);
1315   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1316 import arvados
1317 import sys
1318 print (arvados.api("v1").collections().
1319        create(body={"manifest_text": sys.stdin.read()}).
1320        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1321 }, retry_count());
1322
1323   my $task_idx = -1;
1324   my $manifest_size = 0;
1325   for (@jobstep)
1326   {
1327     ++$task_idx;
1328     my $output = $_->{'arvados_task'}->{output};
1329     next if (!defined($output));
1330     my $next_write;
1331     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1332       $next_write = fetch_block($output);
1333     } else {
1334       $next_write = $output;
1335     }
1336     if (defined($next_write)) {
1337       if (!defined(syswrite($child_in, $next_write))) {
1338         # There's been an error writing.  Stop the loop.
1339         # We'll log details about the exit code later.
1340         last;
1341       } else {
1342         $manifest_size += length($next_write);
1343       }
1344     } else {
1345       my $uuid = $_->{'arvados_task'}->{'uuid'};
1346       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1347       $main::success = 0;
1348     }
1349   }
1350   close($child_in);
1351   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1352
1353   my $joboutput;
1354   my $s = IO::Select->new($child_out);
1355   if ($s->can_read(120)) {
1356     sysread($child_out, $joboutput, 1024 * 1024);
1357     waitpid($pid, 0);
1358     if ($?) {
1359       Log(undef, "output collection creation exited " . exit_status_s($?));
1360       $joboutput = undef;
1361     } else {
1362       chomp($joboutput);
1363     }
1364   } else {
1365     Log (undef, "timed out while creating output collection");
1366     foreach my $signal (2, 2, 2, 15, 15, 9) {
1367       kill($signal, $pid);
1368       last if waitpid($pid, WNOHANG) == -1;
1369       sleep(1);
1370     }
1371   }
1372   close($child_out);
1373
1374   return $joboutput;
1375 }
1376
1377
1378 sub killem
1379 {
1380   foreach (@_)
1381   {
1382     my $sig = 2;                # SIGINT first
1383     if (exists $proc{$_}->{"sent_$sig"} &&
1384         time - $proc{$_}->{"sent_$sig"} > 4)
1385     {
1386       $sig = 15;                # SIGTERM if SIGINT doesn't work
1387     }
1388     if (exists $proc{$_}->{"sent_$sig"} &&
1389         time - $proc{$_}->{"sent_$sig"} > 4)
1390     {
1391       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1392     }
1393     if (!exists $proc{$_}->{"sent_$sig"})
1394     {
1395       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1396       kill $sig, $_;
1397       select (undef, undef, undef, 0.1);
1398       if ($sig == 2)
1399       {
1400         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1401       }
1402       $proc{$_}->{"sent_$sig"} = time;
1403       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1404     }
1405   }
1406 }
1407
1408
1409 sub fhbits
1410 {
1411   my($bits);
1412   for (@_) {
1413     vec($bits,fileno($_),1) = 1;
1414   }
1415   $bits;
1416 }
1417
1418
1419 # Send log output to Keep via arv-put.
1420 #
1421 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1422 # $log_pipe_pid is the pid of the arv-put subprocess.
1423 #
1424 # The only functions that should access these variables directly are:
1425 #
1426 # log_writer_start($logfilename)
1427 #     Starts an arv-put pipe, reading data on stdin and writing it to
1428 #     a $logfilename file in an output collection.
1429 #
1430 # log_writer_send($txt)
1431 #     Writes $txt to the output log collection.
1432 #
1433 # log_writer_finish()
1434 #     Closes the arv-put pipe and returns the output that it produces.
1435 #
1436 # log_writer_is_active()
1437 #     Returns a true value if there is currently a live arv-put
1438 #     process, false otherwise.
1439 #
1440 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1441
1442 sub log_writer_start($)
1443 {
1444   my $logfilename = shift;
1445   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1446                         'arv-put',
1447                         '--portable-data-hash',
1448                         '--project-uuid', $Job->{owner_uuid},
1449                         '--retries', '3',
1450                         '--name', $logfilename,
1451                         '--filename', $logfilename,
1452                         '-');
1453 }
1454
1455 sub log_writer_send($)
1456 {
1457   my $txt = shift;
1458   print $log_pipe_in $txt;
1459 }
1460
1461 sub log_writer_finish()
1462 {
1463   return unless $log_pipe_pid;
1464
1465   close($log_pipe_in);
1466   my $arv_put_output;
1467
1468   my $s = IO::Select->new($log_pipe_out);
1469   if ($s->can_read(120)) {
1470     sysread($log_pipe_out, $arv_put_output, 1024);
1471     chomp($arv_put_output);
1472   } else {
1473     Log (undef, "timed out reading from 'arv-put'");
1474   }
1475
1476   waitpid($log_pipe_pid, 0);
1477   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1478   if ($?) {
1479     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1480   }
1481
1482   return $arv_put_output;
1483 }
1484
1485 sub log_writer_is_active() {
1486   return $log_pipe_pid;
1487 }
1488
1489 sub Log                         # ($jobstep_id, $logmessage)
1490 {
1491   if ($_[1] =~ /\n/) {
1492     for my $line (split (/\n/, $_[1])) {
1493       Log ($_[0], $line);
1494     }
1495     return;
1496   }
1497   my $fh = select STDERR; $|=1; select $fh;
1498   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1499   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1500   $message .= "\n";
1501   my $datetime;
1502   if (log_writer_is_active() || -t STDERR) {
1503     my @gmtime = gmtime;
1504     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1505                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1506   }
1507   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1508
1509   if (log_writer_is_active()) {
1510     log_writer_send($datetime . " " . $message);
1511   }
1512 }
1513
1514
1515 sub croak
1516 {
1517   my ($package, $file, $line) = caller;
1518   my $message = "@_ at $file line $line\n";
1519   Log (undef, $message);
1520   freeze() if @jobstep_todo;
1521   create_output_collection() if @jobstep_todo;
1522   cleanup();
1523   save_meta();
1524   die;
1525 }
1526
1527
1528 sub cleanup
1529 {
1530   return unless $Job;
1531   if ($Job->{'state'} eq 'Cancelled') {
1532     $Job->update_attributes('finished_at' => scalar gmtime);
1533   } else {
1534     $Job->update_attributes('state' => 'Failed');
1535   }
1536 }
1537
1538
1539 sub save_meta
1540 {
1541   my $justcheckpoint = shift; # false if this will be the last meta saved
1542   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1543   return unless log_writer_is_active();
1544
1545   my $loglocator = log_writer_finish();
1546   Log (undef, "log manifest is $loglocator");
1547   $Job->{'log'} = $loglocator;
1548   $Job->update_attributes('log', $loglocator);
1549 }
1550
1551
1552 sub freeze_if_want_freeze
1553 {
1554   if ($main::please_freeze)
1555   {
1556     release_allocation();
1557     if (@_)
1558     {
1559       # kill some srun procs before freeze+stop
1560       map { $proc{$_} = {} } @_;
1561       while (%proc)
1562       {
1563         killem (keys %proc);
1564         select (undef, undef, undef, 0.1);
1565         my $died;
1566         while (($died = waitpid (-1, WNOHANG)) > 0)
1567         {
1568           delete $proc{$died};
1569         }
1570       }
1571     }
1572     freeze();
1573     create_output_collection();
1574     cleanup();
1575     save_meta();
1576     exit 1;
1577   }
1578 }
1579
1580
1581 sub freeze
1582 {
1583   Log (undef, "Freeze not implemented");
1584   return;
1585 }
1586
1587
1588 sub thaw
1589 {
1590   croak ("Thaw not implemented");
1591 }
1592
1593
1594 sub freezequote
1595 {
1596   my $s = shift;
1597   $s =~ s/\\/\\\\/g;
1598   $s =~ s/\n/\\n/g;
1599   return $s;
1600 }
1601
1602
1603 sub freezeunquote
1604 {
1605   my $s = shift;
1606   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1607   return $s;
1608 }
1609
1610
1611 sub srun
1612 {
1613   my $srunargs = shift;
1614   my $execargs = shift;
1615   my $opts = shift || {};
1616   my $stdin = shift;
1617   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1618
1619   $Data::Dumper::Terse = 1;
1620   $Data::Dumper::Indent = 0;
1621   my $show_cmd = Dumper($args);
1622   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1623   $show_cmd =~ s/\n/ /g;
1624   warn "starting: $show_cmd\n";
1625
1626   if (defined $stdin) {
1627     my $child = open STDIN, "-|";
1628     defined $child or die "no fork: $!";
1629     if ($child == 0) {
1630       print $stdin or die $!;
1631       close STDOUT or die $!;
1632       exit 0;
1633     }
1634   }
1635
1636   return system (@$args) if $opts->{fork};
1637
1638   exec @$args;
1639   warn "ENV size is ".length(join(" ",%ENV));
1640   die "exec failed: $!: @$args";
1641 }
1642
1643
1644 sub ban_node_by_slot {
1645   # Don't start any new jobsteps on this node for 60 seconds
1646   my $slotid = shift;
1647   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1648   $slot[$slotid]->{node}->{hold_count}++;
1649   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1650 }
1651
1652 sub must_lock_now
1653 {
1654   my ($lockfile, $error_message) = @_;
1655   open L, ">", $lockfile or croak("$lockfile: $!");
1656   if (!flock L, LOCK_EX|LOCK_NB) {
1657     croak("Can't lock $lockfile: $error_message\n");
1658   }
1659 }
1660
1661 sub find_docker_image {
1662   # Given a Keep locator, check to see if it contains a Docker image.
1663   # If so, return its stream name and Docker hash.
1664   # If not, return undef for both values.
1665   my $locator = shift;
1666   my ($streamname, $filename);
1667   my $image = api_call("collections/get", uuid => $locator);
1668   if ($image) {
1669     foreach my $line (split(/\n/, $image->{manifest_text})) {
1670       my @tokens = split(/\s+/, $line);
1671       next if (!@tokens);
1672       $streamname = shift(@tokens);
1673       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1674         if (defined($filename)) {
1675           return (undef, undef);  # More than one file in the Collection.
1676         } else {
1677           $filename = (split(/:/, $filedata, 3))[2];
1678         }
1679       }
1680     }
1681   }
1682   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1683     return ($streamname, $1);
1684   } else {
1685     return (undef, undef);
1686   }
1687 }
1688
1689 sub retry_count {
1690   # Calculate the number of times an operation should be retried,
1691   # assuming exponential backoff, and that we're willing to retry as
1692   # long as tasks have been running.  Enforce a minimum of 3 retries.
1693   my ($starttime, $endtime, $timediff, $retries);
1694   if (@jobstep) {
1695     $starttime = $jobstep[0]->{starttime};
1696     $endtime = $jobstep[-1]->{finishtime};
1697   }
1698   if (!defined($starttime)) {
1699     $timediff = 0;
1700   } elsif (!defined($endtime)) {
1701     $timediff = time - $starttime;
1702   } else {
1703     $timediff = ($endtime - $starttime) - (time - $endtime);
1704   }
1705   if ($timediff > 0) {
1706     $retries = int(log($timediff) / log(2));
1707   } else {
1708     $retries = 1;  # Use the minimum.
1709   }
1710   return ($retries > 3) ? $retries : 3;
1711 }
1712
1713 sub retry_op {
1714   # Pass in two function references.
1715   # This method will be called with the remaining arguments.
1716   # If it dies, retry it with exponential backoff until it succeeds,
1717   # or until the current retry_count is exhausted.  After each failure
1718   # that can be retried, the second function will be called with
1719   # the current try count (0-based), next try time, and error message.
1720   my $operation = shift;
1721   my $retry_callback = shift;
1722   my $retries = retry_count();
1723   foreach my $try_count (0..$retries) {
1724     my $next_try = time + (2 ** $try_count);
1725     my $result = eval { $operation->(@_); };
1726     if (!$@) {
1727       return $result;
1728     } elsif ($try_count < $retries) {
1729       $retry_callback->($try_count, $next_try, $@);
1730       my $sleep_time = $next_try - time;
1731       sleep($sleep_time) if ($sleep_time > 0);
1732     }
1733   }
1734   # Ensure the error message ends in a newline, so Perl doesn't add
1735   # retry_op's line number to it.
1736   chomp($@);
1737   die($@ . "\n");
1738 }
1739
1740 sub api_call {
1741   # Pass in a /-separated API method name, and arguments for it.
1742   # This function will call that method, retrying as needed until
1743   # the current retry_count is exhausted, with a log on the first failure.
1744   my $method_name = shift;
1745   my $log_api_retry = sub {
1746     my ($try_count, $next_try_at, $errmsg) = @_;
1747     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1748     $errmsg =~ s/\s/ /g;
1749     $errmsg =~ s/\s+$//;
1750     my $retry_msg;
1751     if ($next_try_at < time) {
1752       $retry_msg = "Retrying.";
1753     } else {
1754       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1755       $retry_msg = "Retrying at $next_try_fmt.";
1756     }
1757     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1758   };
1759   my $method = $arv;
1760   foreach my $key (split(/\//, $method_name)) {
1761     $method = $method->{$key};
1762   }
1763   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1764 }
1765
1766 sub exit_status_s {
1767   # Given a $?, return a human-readable exit code string like "0" or
1768   # "1" or "0 with signal 1" or "1 with signal 11".
1769   my $exitcode = shift;
1770   my $s = $exitcode >> 8;
1771   if ($exitcode & 0x7f) {
1772     $s .= " with signal " . ($exitcode & 0x7f);
1773   }
1774   if ($exitcode & 0x80) {
1775     $s .= " with core dump";
1776   }
1777   return $s;
1778 }
1779
1780 sub handle_readall {
1781   # Pass in a glob reference to a file handle.
1782   # Read all its contents and return them as a string.
1783   my $fh_glob_ref = shift;
1784   local $/ = undef;
1785   return <$fh_glob_ref>;
1786 }
1787
1788 sub tar_filename_n {
1789   my $n = shift;
1790   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1791 }
1792
1793 sub add_git_archive {
1794   # Pass in a git archive command as a string or list, a la system().
1795   # This method will save its output to be included in the archive sent to the
1796   # build script.
1797   my $git_input;
1798   $git_tar_count++;
1799   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1800     croak("Failed to save git archive: $!");
1801   }
1802   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1803   close($git_input);
1804   waitpid($git_pid, 0);
1805   close(GIT_ARCHIVE);
1806   if ($?) {
1807     croak("Failed to save git archive: git exited " . exit_status_s($?));
1808   }
1809 }
1810
1811 sub combined_git_archive {
1812   # Combine all saved tar archives into a single archive, then return its
1813   # contents in a string.  Return undef if no archives have been saved.
1814   if ($git_tar_count < 1) {
1815     return undef;
1816   }
1817   my $base_tar_name = tar_filename_n(1);
1818   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1819     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1820     if ($tar_exit != 0) {
1821       croak("Error preparing build archive: tar -A exited " .
1822             exit_status_s($tar_exit));
1823     }
1824   }
1825   if (!open(GIT_TAR, "<", $base_tar_name)) {
1826     croak("Could not open build archive: $!");
1827   }
1828   my $tar_contents = handle_readall(\*GIT_TAR);
1829   close(GIT_TAR);
1830   return $tar_contents;
1831 }
1832
1833 __DATA__
1834 #!/usr/bin/perl
1835 #
1836 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1837 # server invokes this script on individual compute nodes, or localhost if we're
1838 # running a job locally.  It gets called in two modes:
1839 #
1840 # * No arguments: Installation mode.  Read a tar archive from the DATA
1841 #   file handle; it includes the Crunch script's source code, and
1842 #   maybe SDKs as well.  Those should be installed in the proper
1843 #   locations.  This runs outside of any Docker container, so don't try to
1844 #   introspect Crunch's runtime environment.
1845 #
1846 # * With arguments: Crunch script run mode.  This script should set up the
1847 #   environment, then run the command specified in the arguments.  This runs
1848 #   inside any Docker container.
1849
1850 use Fcntl ':flock';
1851 use File::Path qw( make_path remove_tree );
1852 use POSIX qw(getcwd);
1853
1854 use constant TASK_TEMPFAIL => 111;
1855
1856 # Map SDK subdirectories to the path environments they belong to.
1857 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1858
1859 my $destdir = $ENV{"CRUNCH_SRC"};
1860 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1861 my $repo = $ENV{"CRUNCH_SRC_URL"};
1862 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1863 my $job_work = $ENV{"JOB_WORK"};
1864 my $task_work = $ENV{"TASK_WORK"};
1865
1866 for my $dir ($destdir, $job_work, $task_work) {
1867   if ($dir) {
1868     make_path $dir;
1869     -e $dir or die "Failed to create temporary directory ($dir): $!";
1870   }
1871 }
1872
1873 if ($task_work) {
1874   remove_tree($task_work, {keep_root => 1});
1875 }
1876
1877 open(STDOUT_ORIG, ">&", STDOUT);
1878 open(STDERR_ORIG, ">&", STDERR);
1879 open(STDOUT, ">>", "$destdir.log");
1880 open(STDERR, ">&", STDOUT);
1881
1882 ### Crunch script run mode
1883 if (@ARGV) {
1884   # We want to do routine logging during task 0 only.  This gives the user
1885   # the information they need, but avoids repeating the information for every
1886   # task.
1887   my $Log;
1888   if ($ENV{TASK_SEQUENCE} eq "0") {
1889     $Log = sub {
1890       my $msg = shift;
1891       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1892     };
1893   } else {
1894     $Log = sub { };
1895   }
1896
1897   my $python_src = "$install_dir/python";
1898   my $venv_dir = "$job_work/.arvados.venv";
1899   my $venv_built = -e "$venv_dir/bin/activate";
1900   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1901     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1902                  "--python=python2.7", $venv_dir);
1903     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1904     $venv_built = 1;
1905     $Log->("Built Python SDK virtualenv");
1906   }
1907
1908   my $pip_bin = "pip";
1909   if ($venv_built) {
1910     $Log->("Running in Python SDK virtualenv");
1911     $pip_bin = "$venv_dir/bin/pip";
1912     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1913     @ARGV = ("/bin/sh", "-ec",
1914              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1915   } elsif (-d $python_src) {
1916     $Log->("Warning: virtualenv not found inside Docker container default " .
1917            "\$PATH. Can't install Python SDK.");
1918   }
1919
1920   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1921   if ($pkgs) {
1922     $Log->("Using Arvados SDK:");
1923     foreach my $line (split /\n/, $pkgs) {
1924       $Log->($line);
1925     }
1926   } else {
1927     $Log->("Arvados SDK packages not found");
1928   }
1929
1930   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1931     my $sdk_path = "$install_dir/$sdk_dir";
1932     if (-d $sdk_path) {
1933       if ($ENV{$sdk_envkey}) {
1934         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1935       } else {
1936         $ENV{$sdk_envkey} = $sdk_path;
1937       }
1938       $Log->("Arvados SDK added to %s", $sdk_envkey);
1939     }
1940   }
1941
1942   close(STDOUT);
1943   close(STDERR);
1944   open(STDOUT, ">&", STDOUT_ORIG);
1945   open(STDERR, ">&", STDERR_ORIG);
1946   exec(@ARGV);
1947   die "Cannot exec `@ARGV`: $!";
1948 }
1949
1950 ### Installation mode
1951 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1952 flock L, LOCK_EX;
1953 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1954   # This version already installed -> nothing to do.
1955   exit(0);
1956 }
1957
1958 unlink "$destdir.commit";
1959 mkdir $destdir;
1960
1961 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1962   die "Error launching 'tar -xC $destdir': $!";
1963 }
1964 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1965 # get SIGPIPE.  We must feed it data incrementally.
1966 my $tar_input;
1967 while (read(DATA, $tar_input, 65536)) {
1968   print TARX $tar_input;
1969 }
1970 if(!close(TARX)) {
1971   die "'tar -xC $destdir' exited $?: $!";
1972 }
1973
1974 mkdir $install_dir;
1975
1976 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1977 if (-d $sdk_root) {
1978   foreach my $sdk_lang (("python",
1979                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1980     if (-d "$sdk_root/$sdk_lang") {
1981       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1982         die "Failed to install $sdk_lang SDK: $!";
1983       }
1984     }
1985   }
1986 }
1987
1988 my $python_dir = "$install_dir/python";
1989 if ((-d $python_dir) and can_run("python2.7") and
1990     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1991   # egg_info failed, probably when it asked git for a build tag.
1992   # Specify no build tag.
1993   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1994   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1995   close($pysdk_cfg);
1996 }
1997
1998 if (-e "$destdir/crunch_scripts/install") {
1999     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2000 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2001     # Old version
2002     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2003 } elsif (-e "./install.sh") {
2004     shell_or_die (undef, "./install.sh", $install_dir);
2005 }
2006
2007 if ($commit) {
2008     unlink "$destdir.commit.new";
2009     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
2010     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
2011 }
2012
2013 close L;
2014
2015 sub can_run {
2016   my $command_name = shift;
2017   open(my $which, "-|", "which", $command_name);
2018   while (<$which>) { }
2019   close($which);
2020   return ($? == 0);
2021 }
2022
2023 sub shell_or_die
2024 {
2025   my $exitcode = shift;
2026
2027   if ($ENV{"DEBUG"}) {
2028     print STDERR "@_\n";
2029   }
2030   if (system (@_) != 0) {
2031     my $err = $!;
2032     my $code = $?;
2033     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2034     open STDERR, ">&STDERR_ORIG";
2035     system ("cat $destdir.log >&2");
2036     warn "@_ failed ($err): $exitstatus";
2037     if (defined($exitcode)) {
2038       exit $exitcode;
2039     }
2040     else {
2041       exit (($code >> 8) || 1);
2042     }
2043   }
2044 }
2045
2046 __DATA__