5198: Merge branch 'master' into 5198-remote-link-ctrl-click
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my $force_unlock;
122 my $git_dir;
123 my $jobspec;
124 my $job_api_token;
125 my $no_clear_tmp;
126 my $resume_stash;
127 GetOptions('force-unlock' => \$force_unlock,
128            'git-dir=s' => \$git_dir,
129            'job=s' => \$jobspec,
130            'job-api-token=s' => \$job_api_token,
131            'no-clear-tmp' => \$no_clear_tmp,
132            'resume-stash=s' => \$resume_stash,
133     );
134
135 if (defined $job_api_token) {
136   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 }
138
139 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
140 my $local_job = 0;
141
142
143 $SIG{'USR1'} = sub
144 {
145   $main::ENV{CRUNCH_DEBUG} = 1;
146 };
147 $SIG{'USR2'} = sub
148 {
149   $main::ENV{CRUNCH_DEBUG} = 0;
150 };
151
152
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $User = api_call("users/current");
163
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   if (!$force_unlock) {
169     # Claim this job, and make sure nobody else does
170     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
171     if ($@) {
172       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
173       exit EX_TEMPFAIL;
174     };
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189   $Job->{'state'} = 'Running';
190
191   $Job = api_call("jobs/create", job => $Job);
192 }
193 $job_id = $Job->{'uuid'};
194
195 my $keep_logfile = $job_id . '.log.txt';
196 log_writer_start($keep_logfile);
197
198 $Job->{'runtime_constraints'} ||= {};
199 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
200 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201
202 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
203 if ($? == 0) {
204   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
205   chomp($gem_versions);
206   chop($gem_versions);  # Closing parentheses
207 } else {
208   $gem_versions = "";
209 }
210 Log(undef,
211     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
212
213 Log (undef, "check slurm allocation");
214 my @slot;
215 my @node;
216 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
217 my @sinfo;
218 if (!$have_slurm)
219 {
220   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
221   push @sinfo, "$localcpus localhost";
222 }
223 if (exists $ENV{SLURM_NODELIST})
224 {
225   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
226 }
227 foreach (@sinfo)
228 {
229   my ($ncpus, $slurm_nodelist) = split;
230   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
231
232   my @nodelist;
233   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
234   {
235     my $nodelist = $1;
236     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
237     {
238       my $ranges = $1;
239       foreach (split (",", $ranges))
240       {
241         my ($a, $b);
242         if (/(\d+)-(\d+)/)
243         {
244           $a = $1;
245           $b = $2;
246         }
247         else
248         {
249           $a = $_;
250           $b = $_;
251         }
252         push @nodelist, map {
253           my $n = $nodelist;
254           $n =~ s/\[[-,\d]+\]/$_/;
255           $n;
256         } ($a..$b);
257       }
258     }
259     else
260     {
261       push @nodelist, $nodelist;
262     }
263   }
264   foreach my $nodename (@nodelist)
265   {
266     Log (undef, "node $nodename - $ncpus slots");
267     my $node = { name => $nodename,
268                  ncpus => $ncpus,
269                  losing_streak => 0,
270                  hold_until => 0 };
271     foreach my $cpu (1..$ncpus)
272     {
273       push @slot, { node => $node,
274                     cpu => $cpu };
275     }
276   }
277   push @node, @nodelist;
278 }
279
280
281
282 # Ensure that we get one jobstep running on each allocated node before
283 # we start overloading nodes with concurrent steps
284
285 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
286
287
288 $Job->update_attributes(
289   'tasks_summary' => { 'failed' => 0,
290                        'todo' => 1,
291                        'running' => 0,
292                        'done' => 0 });
293
294 Log (undef, "start");
295 $SIG{'INT'} = sub { $main::please_freeze = 1; };
296 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
297 $SIG{'TERM'} = \&croak;
298 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
299 $SIG{'ALRM'} = sub { $main::please_info = 1; };
300 $SIG{'CONT'} = sub { $main::please_continue = 1; };
301 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
302
303 $main::please_freeze = 0;
304 $main::please_info = 0;
305 $main::please_continue = 0;
306 $main::please_refresh = 0;
307 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
308
309 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
310 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
311 $ENV{"JOB_UUID"} = $job_id;
312
313
314 my @jobstep_todo = ();
315 my @jobstep_done = ();
316 my @jobstep_tomerge = ();
317 my $jobstep_tomerge_level = 0;
318 my $squeue_checked;
319 my $squeue_kill_checked;
320 my $latest_refresh = scalar time;
321
322
323
324 if (defined $Job->{thawedfromkey})
325 {
326   thaw ($Job->{thawedfromkey});
327 }
328 else
329 {
330   my $first_task = api_call("job_tasks/create", job_task => {
331     'job_uuid' => $Job->{'uuid'},
332     'sequence' => 0,
333     'qsequence' => 0,
334     'parameters' => {},
335   });
336   push @jobstep, { 'level' => 0,
337                    'failures' => 0,
338                    'arvados_task' => $first_task,
339                  };
340   push @jobstep_todo, 0;
341 }
342
343
344 if (!$have_slurm)
345 {
346   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
347 }
348
349 my $build_script = handle_readall(\*DATA);
350 my $nodelist = join(",", @node);
351 my $git_tar_count = 0;
352
353 if (!defined $no_clear_tmp) {
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355   Log (undef, "Clean work dirs");
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     # Find FUSE mounts that look like Keep mounts (the mount path has the
361     # word "keep") and unmount them.  Then clean up work directories.
362     # TODO: When #5036 is done and widely deployed, we can get rid of the
363     # regular expression and just unmount everything with type fuse.keep.
364     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
365           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
366     exit (1);
367   }
368   while (1)
369   {
370     last if $cleanpid == waitpid (-1, WNOHANG);
371     freeze_if_want_freeze ($cleanpid);
372     select (undef, undef, undef, 0.1);
373   }
374   Log (undef, "Cleanup command exited ".exit_status_s($?));
375 }
376
377 # If this job requires a Docker image, install that.
378 my $docker_bin = "/usr/bin/docker.io";
379 my ($docker_locator, $docker_stream, $docker_hash);
380 if ($docker_locator = $Job->{docker_image_locator}) {
381   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
382   if (!$docker_hash)
383   {
384     croak("No Docker image hash found from locator $docker_locator");
385   }
386   $docker_stream =~ s/^\.//;
387   my $docker_install_script = qq{
388 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
389     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
390 fi
391 };
392   my $docker_pid = fork();
393   if ($docker_pid == 0)
394   {
395     srun (["srun", "--nodelist=" . join(',', @node)],
396           ["/bin/sh", "-ec", $docker_install_script]);
397     exit ($?);
398   }
399   while (1)
400   {
401     last if $docker_pid == waitpid (-1, WNOHANG);
402     freeze_if_want_freeze ($docker_pid);
403     select (undef, undef, undef, 0.1);
404   }
405   if ($? != 0)
406   {
407     croak("Installing Docker image from $docker_locator exited "
408           .exit_status_s($?));
409   }
410
411   if ($Job->{arvados_sdk_version}) {
412     # The job also specifies an Arvados SDK version.  Add the SDKs to the
413     # tar file for the build script to install.
414     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
415                        $Job->{arvados_sdk_version}));
416     add_git_archive("git", "--git-dir=$git_dir", "archive",
417                     "--prefix=.arvados.sdk/",
418                     $Job->{arvados_sdk_version}, "sdk");
419   }
420 }
421
422 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
423   # If script_version looks like an absolute path, *and* the --git-dir
424   # argument was not given -- which implies we were not invoked by
425   # crunch-dispatch -- we will use the given path as a working
426   # directory instead of resolving script_version to a git commit (or
427   # doing anything else with git).
428   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
429   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
430 }
431 else {
432   # Resolve the given script_version to a git commit sha1. Also, if
433   # the repository is remote, clone it into our local filesystem: this
434   # ensures "git archive" will work, and is necessary to reliably
435   # resolve a symbolic script_version like "master^".
436   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
437
438   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
439
440   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
441
442   # If we're running under crunch-dispatch, it will have already
443   # pulled the appropriate source tree into its own repository, and
444   # given us that repo's path as $git_dir.
445   #
446   # If we're running a "local" job, we might have to fetch content
447   # from a remote repository.
448   #
449   # (Currently crunch-dispatch gives a local path with --git-dir, but
450   # we might as well accept URLs there too in case it changes its
451   # mind.)
452   my $repo = $git_dir || $Job->{'repository'};
453
454   # Repository can be remote or local. If remote, we'll need to fetch it
455   # to a local dir before doing `git log` et al.
456   my $repo_location;
457
458   if ($repo =~ m{://|^[^/]*:}) {
459     # $repo is a git url we can clone, like git:// or https:// or
460     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
461     # not recognized here because distinguishing that from a local
462     # path is too fragile. If you really need something strange here,
463     # use the ssh:// form.
464     $repo_location = 'remote';
465   } elsif ($repo =~ m{^\.*/}) {
466     # $repo is a local path to a git index. We'll also resolve ../foo
467     # to ../foo/.git if the latter is a directory. To help
468     # disambiguate local paths from named hosted repositories, this
469     # form must be given as ./ or ../ if it's a relative path.
470     if (-d "$repo/.git") {
471       $repo = "$repo/.git";
472     }
473     $repo_location = 'local';
474   } else {
475     # $repo is none of the above. It must be the name of a hosted
476     # repository.
477     my $arv_repo_list = api_call("repositories/list",
478                                  'filters' => [['name','=',$repo]]);
479     my @repos_found = @{$arv_repo_list->{'items'}};
480     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
481     if ($n_found > 0) {
482       Log(undef, "Repository '$repo' -> "
483           . join(", ", map { $_->{'uuid'} } @repos_found));
484     }
485     if ($n_found != 1) {
486       croak("Error: Found $n_found repositories with name '$repo'.");
487     }
488     $repo = $repos_found[0]->{'fetch_url'};
489     $repo_location = 'remote';
490   }
491   Log(undef, "Using $repo_location repository '$repo'");
492   $ENV{"CRUNCH_SRC_URL"} = $repo;
493
494   # Resolve given script_version (we'll call that $treeish here) to a
495   # commit sha1 ($commit).
496   my $treeish = $Job->{'script_version'};
497   my $commit;
498   if ($repo_location eq 'remote') {
499     # We minimize excess object-fetching by re-using the same bare
500     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
501     # just keep adding remotes to it as needed.
502     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
503     my $gitcmd = "git --git-dir=\Q$local_repo\E";
504
505     # Set up our local repo for caching remote objects, making
506     # archives, etc.
507     if (!-d $local_repo) {
508       make_path($local_repo) or croak("Error: could not create $local_repo");
509     }
510     # This works (exits 0 and doesn't delete fetched objects) even
511     # if $local_repo is already initialized:
512     `$gitcmd init --bare`;
513     if ($?) {
514       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
515     }
516
517     # If $treeish looks like a hash (or abbrev hash) we look it up in
518     # our local cache first, since that's cheaper. (We don't want to
519     # do that with tags/branches though -- those change over time, so
520     # they should always be resolved by the remote repo.)
521     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
522       # Hide stderr because it's normal for this to fail:
523       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
524       if ($? == 0 &&
525           # Careful not to resolve a branch named abcdeff to commit 1234567:
526           $sha1 =~ /^$treeish/ &&
527           $sha1 =~ /^([0-9a-f]{40})$/s) {
528         $commit = $1;
529         Log(undef, "Commit $commit already present in $local_repo");
530       }
531     }
532
533     if (!defined $commit) {
534       # If $treeish isn't just a hash or abbrev hash, or isn't here
535       # yet, we need to fetch the remote to resolve it correctly.
536
537       # First, remove all local heads. This prevents a name that does
538       # not exist on the remote from resolving to (or colliding with)
539       # a previously fetched branch or tag (possibly from a different
540       # remote).
541       remove_tree("$local_repo/refs/heads", {keep_root => 1});
542
543       Log(undef, "Fetching objects from $repo to $local_repo");
544       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
545       if ($?) {
546         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
547       }
548     }
549
550     # Now that the data is all here, we will use our local repo for
551     # the rest of our git activities.
552     $repo = $local_repo;
553   }
554
555   my $gitcmd = "git --git-dir=\Q$repo\E";
556   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
557   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
558     croak("`$gitcmd rev-list` exited "
559           .exit_status_s($?)
560           .", '$treeish' not found. Giving up.");
561   }
562   $commit = $1;
563   Log(undef, "Version $treeish is commit $commit");
564
565   if ($commit ne $Job->{'script_version'}) {
566     # Record the real commit id in the database, frozentokey, logs,
567     # etc. -- instead of an abbreviation or a branch name which can
568     # become ambiguous or point to a different commit in the future.
569     if (!$Job->update_attributes('script_version' => $commit)) {
570       croak("Error: failed to update job's script_version attribute");
571     }
572   }
573
574   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
575   add_git_archive("$gitcmd archive ''\Q$commit\E");
576 }
577
578 my $git_archive = combined_git_archive();
579 if (!defined $git_archive) {
580   Log(undef, "Skip install phase (no git archive)");
581   if ($have_slurm) {
582     Log(undef, "Warning: This probably means workers have no source tree!");
583   }
584 }
585 else {
586   Log(undef, "Run install script on all workers");
587
588   my @srunargs = ("srun",
589                   "--nodelist=$nodelist",
590                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
591   my @execargs = ("sh", "-c",
592                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
593
594   my $installpid = fork();
595   if ($installpid == 0)
596   {
597     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
598     exit (1);
599   }
600   while (1)
601   {
602     last if $installpid == waitpid (-1, WNOHANG);
603     freeze_if_want_freeze ($installpid);
604     select (undef, undef, undef, 0.1);
605   }
606   my $install_exited = $?;
607   Log (undef, "Install script exited ".exit_status_s($install_exited));
608   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
609     unlink($tar_filename);
610   }
611   exit (1) if $install_exited != 0;
612 }
613
614 foreach (qw (script script_version script_parameters runtime_constraints))
615 {
616   Log (undef,
617        "$_ " .
618        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
619 }
620 foreach (split (/\n/, $Job->{knobs}))
621 {
622   Log (undef, "knob " . $_);
623 }
624
625
626
627 $main::success = undef;
628
629
630
631 ONELEVEL:
632
633 my $thisround_succeeded = 0;
634 my $thisround_failed = 0;
635 my $thisround_failed_multiple = 0;
636
637 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
638                        or $a <=> $b } @jobstep_todo;
639 my $level = $jobstep[$jobstep_todo[0]]->{level};
640 Log (undef, "start level $level");
641
642
643
644 my %proc;
645 my @freeslot = (0..$#slot);
646 my @holdslot;
647 my %reader;
648 my $progress_is_dirty = 1;
649 my $progress_stats_updated = 0;
650
651 update_progress_stats();
652
653
654
655 THISROUND:
656 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
657 {
658   my $id = $jobstep_todo[$todo_ptr];
659   my $Jobstep = $jobstep[$id];
660   if ($Jobstep->{level} != $level)
661   {
662     next;
663   }
664
665   pipe $reader{$id}, "writer" or croak ($!);
666   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
667   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
668
669   my $childslot = $freeslot[0];
670   my $childnode = $slot[$childslot]->{node};
671   my $childslotname = join (".",
672                             $slot[$childslot]->{node}->{name},
673                             $slot[$childslot]->{cpu});
674
675   my $childpid = fork();
676   if ($childpid == 0)
677   {
678     $SIG{'INT'} = 'DEFAULT';
679     $SIG{'QUIT'} = 'DEFAULT';
680     $SIG{'TERM'} = 'DEFAULT';
681
682     foreach (values (%reader))
683     {
684       close($_);
685     }
686     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
687     open(STDOUT,">&writer");
688     open(STDERR,">&writer");
689
690     undef $dbh;
691     undef $sth;
692
693     delete $ENV{"GNUPGHOME"};
694     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
695     $ENV{"TASK_QSEQUENCE"} = $id;
696     $ENV{"TASK_SEQUENCE"} = $level;
697     $ENV{"JOB_SCRIPT"} = $Job->{script};
698     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
699       $param =~ tr/a-z/A-Z/;
700       $ENV{"JOB_PARAMETER_$param"} = $value;
701     }
702     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
703     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
704     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
705     $ENV{"HOME"} = $ENV{"TASK_WORK"};
706     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
707     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
708     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
709     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
710
711     $ENV{"GZIP"} = "-n";
712
713     my @srunargs = (
714       "srun",
715       "--nodelist=".$childnode->{name},
716       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
717       "--job-name=$job_id.$id.$$",
718         );
719     my $command =
720         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
721         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
722         ."&& cd $ENV{CRUNCH_TMP} ";
723     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
724     if ($docker_hash)
725     {
726       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
727       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
728       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
729
730       # Dynamically configure the container to use the host system as its
731       # DNS server.  Get the host's global addresses from the ip command,
732       # and turn them into docker --dns options using gawk.
733       $command .=
734           q{$(ip -o address show scope global |
735               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
736
737       # The source tree and $destdir directory (which we have
738       # installed on the worker host) are available in the container,
739       # under the same path.
740       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
741       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
742
743       # Currently, we make arv-mount's mount point appear at /keep
744       # inside the container (instead of using the same path as the
745       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
746       # crunch scripts and utilities must not rely on this. They must
747       # use $TASK_KEEPMOUNT.
748       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
749       $ENV{TASK_KEEPMOUNT} = "/keep";
750
751       # TASK_WORK is almost exactly like a docker data volume: it
752       # starts out empty, is writable, and persists until no
753       # containers use it any more. We don't use --volumes-from to
754       # share it with other containers: it is only accessible to this
755       # task, and it goes away when this task stops.
756       #
757       # However, a docker data volume is writable only by root unless
758       # the mount point already happens to exist in the container with
759       # different permissions. Therefore, we [1] assume /tmp already
760       # exists in the image and is writable by the crunch user; [2]
761       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
762       # writable if they are created by docker while setting up the
763       # other --volumes); and [3] create $TASK_WORK inside the
764       # container using $build_script.
765       $command .= "--volume=/tmp ";
766       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
767       $ENV{"HOME"} = $ENV{"TASK_WORK"};
768       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
769
770       # TODO: Share a single JOB_WORK volume across all task
771       # containers on a given worker node, and delete it when the job
772       # ends (and, in case that doesn't work, when the next job
773       # starts).
774       #
775       # For now, use the same approach as TASK_WORK above.
776       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
777
778       while (my ($env_key, $env_val) = each %ENV)
779       {
780         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
781           $command .= "--env=\Q$env_key=$env_val\E ";
782         }
783       }
784       $command .= "--env=\QHOME=$ENV{HOME}\E ";
785       $command .= "\Q$docker_hash\E ";
786       $command .= "stdbuf --output=0 --error=0 ";
787       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
788     } else {
789       # Non-docker run
790       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
791       $command .= "stdbuf --output=0 --error=0 ";
792       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
793     }
794
795     my @execargs = ('bash', '-c', $command);
796     srun (\@srunargs, \@execargs, undef, $build_script);
797     # exec() failed, we assume nothing happened.
798     die "srun() failed on build script\n";
799   }
800   close("writer");
801   if (!defined $childpid)
802   {
803     close $reader{$id};
804     delete $reader{$id};
805     next;
806   }
807   shift @freeslot;
808   $proc{$childpid} = { jobstep => $id,
809                        time => time,
810                        slot => $childslot,
811                        jobstepname => "$job_id.$id.$childpid",
812                      };
813   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
814   $slot[$childslot]->{pid} = $childpid;
815
816   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
817   Log ($id, "child $childpid started on $childslotname");
818   $Jobstep->{starttime} = time;
819   $Jobstep->{node} = $childnode->{name};
820   $Jobstep->{slotindex} = $childslot;
821   delete $Jobstep->{stderr};
822   delete $Jobstep->{finishtime};
823
824   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
825   $Jobstep->{'arvados_task'}->save;
826
827   splice @jobstep_todo, $todo_ptr, 1;
828   --$todo_ptr;
829
830   $progress_is_dirty = 1;
831
832   while (!@freeslot
833          ||
834          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
835   {
836     last THISROUND if $main::please_freeze || defined($main::success);
837     if ($main::please_info)
838     {
839       $main::please_info = 0;
840       freeze();
841       create_output_collection();
842       save_meta(1);
843       update_progress_stats();
844     }
845     my $gotsome
846         = readfrompipes ()
847         + reapchildren ();
848     if (!$gotsome)
849     {
850       check_refresh_wanted();
851       check_squeue();
852       update_progress_stats();
853       select (undef, undef, undef, 0.1);
854     }
855     elsif (time - $progress_stats_updated >= 30)
856     {
857       update_progress_stats();
858     }
859     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
860         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
861     {
862       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
863           .($thisround_failed+$thisround_succeeded)
864           .") -- giving up on this round";
865       Log (undef, $message);
866       last THISROUND;
867     }
868
869     # move slots from freeslot to holdslot (or back to freeslot) if necessary
870     for (my $i=$#freeslot; $i>=0; $i--) {
871       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
872         push @holdslot, (splice @freeslot, $i, 1);
873       }
874     }
875     for (my $i=$#holdslot; $i>=0; $i--) {
876       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
877         push @freeslot, (splice @holdslot, $i, 1);
878       }
879     }
880
881     # give up if no nodes are succeeding
882     if (!grep { $_->{node}->{losing_streak} == 0 &&
883                     $_->{node}->{hold_count} < 4 } @slot) {
884       my $message = "Every node has failed -- giving up on this round";
885       Log (undef, $message);
886       last THISROUND;
887     }
888   }
889 }
890
891
892 push @freeslot, splice @holdslot;
893 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
894
895
896 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
897 while (%proc)
898 {
899   if ($main::please_continue) {
900     $main::please_continue = 0;
901     goto THISROUND;
902   }
903   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
904   readfrompipes ();
905   if (!reapchildren())
906   {
907     check_refresh_wanted();
908     check_squeue();
909     update_progress_stats();
910     select (undef, undef, undef, 0.1);
911     killem (keys %proc) if $main::please_freeze;
912   }
913 }
914
915 update_progress_stats();
916 freeze_if_want_freeze();
917
918
919 if (!defined $main::success)
920 {
921   if (@jobstep_todo &&
922       $thisround_succeeded == 0 &&
923       ($thisround_failed == 0 || $thisround_failed > 4))
924   {
925     my $message = "stop because $thisround_failed tasks failed and none succeeded";
926     Log (undef, $message);
927     $main::success = 0;
928   }
929   if (!@jobstep_todo)
930   {
931     $main::success = 1;
932   }
933 }
934
935 goto ONELEVEL if !defined $main::success;
936
937
938 release_allocation();
939 freeze();
940 my $collated_output = &create_output_collection();
941
942 if (!$collated_output) {
943   Log (undef, "Failed to write output collection");
944 }
945 else {
946   Log(undef, "job output $collated_output");
947   $Job->update_attributes('output' => $collated_output);
948 }
949
950 Log (undef, "finish");
951
952 save_meta();
953
954 my $final_state;
955 if ($collated_output && $main::success) {
956   $final_state = 'Complete';
957 } else {
958   $final_state = 'Failed';
959 }
960 $Job->update_attributes('state' => $final_state);
961
962 exit (($final_state eq 'Complete') ? 0 : 1);
963
964
965
966 sub update_progress_stats
967 {
968   $progress_stats_updated = time;
969   return if !$progress_is_dirty;
970   my ($todo, $done, $running) = (scalar @jobstep_todo,
971                                  scalar @jobstep_done,
972                                  scalar @slot - scalar @freeslot - scalar @holdslot);
973   $Job->{'tasks_summary'} ||= {};
974   $Job->{'tasks_summary'}->{'todo'} = $todo;
975   $Job->{'tasks_summary'}->{'done'} = $done;
976   $Job->{'tasks_summary'}->{'running'} = $running;
977   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
978   Log (undef, "status: $done done, $running running, $todo todo");
979   $progress_is_dirty = 0;
980 }
981
982
983
984 sub reapchildren
985 {
986   my $pid = waitpid (-1, WNOHANG);
987   return 0 if $pid <= 0;
988
989   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
990                   . "."
991                   . $slot[$proc{$pid}->{slot}]->{cpu});
992   my $jobstepid = $proc{$pid}->{jobstep};
993   my $elapsed = time - $proc{$pid}->{time};
994   my $Jobstep = $jobstep[$jobstepid];
995
996   my $childstatus = $?;
997   my $exitvalue = $childstatus >> 8;
998   my $exitinfo = "exit ".exit_status_s($childstatus);
999   $Jobstep->{'arvados_task'}->reload;
1000   my $task_success = $Jobstep->{'arvados_task'}->{success};
1001
1002   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1003
1004   if (!defined $task_success) {
1005     # task did not indicate one way or the other --> fail
1006     $Jobstep->{'arvados_task'}->{success} = 0;
1007     $Jobstep->{'arvados_task'}->save;
1008     $task_success = 0;
1009   }
1010
1011   if (!$task_success)
1012   {
1013     my $temporary_fail;
1014     $temporary_fail ||= $Jobstep->{node_fail};
1015     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1016
1017     ++$thisround_failed;
1018     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1019
1020     # Check for signs of a failed or misconfigured node
1021     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1022         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1023       # Don't count this against jobstep failure thresholds if this
1024       # node is already suspected faulty and srun exited quickly
1025       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1026           $elapsed < 5) {
1027         Log ($jobstepid, "blaming failure on suspect node " .
1028              $slot[$proc{$pid}->{slot}]->{node}->{name});
1029         $temporary_fail ||= 1;
1030       }
1031       ban_node_by_slot($proc{$pid}->{slot});
1032     }
1033
1034     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1035                              ++$Jobstep->{'failures'},
1036                              $temporary_fail ? 'temporary ' : 'permanent',
1037                              $elapsed));
1038
1039     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1040       # Give up on this task, and the whole job
1041       $main::success = 0;
1042     }
1043     # Put this task back on the todo queue
1044     push @jobstep_todo, $jobstepid;
1045     $Job->{'tasks_summary'}->{'failed'}++;
1046   }
1047   else
1048   {
1049     ++$thisround_succeeded;
1050     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1051     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1052     push @jobstep_done, $jobstepid;
1053     Log ($jobstepid, "success in $elapsed seconds");
1054   }
1055   $Jobstep->{exitcode} = $childstatus;
1056   $Jobstep->{finishtime} = time;
1057   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1058   $Jobstep->{'arvados_task'}->save;
1059   process_stderr ($jobstepid, $task_success);
1060   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1061                            length($Jobstep->{'arvados_task'}->{output}),
1062                            $Jobstep->{'arvados_task'}->{output}));
1063
1064   close $reader{$jobstepid};
1065   delete $reader{$jobstepid};
1066   delete $slot[$proc{$pid}->{slot}]->{pid};
1067   push @freeslot, $proc{$pid}->{slot};
1068   delete $proc{$pid};
1069
1070   if ($task_success) {
1071     # Load new tasks
1072     my $newtask_list = [];
1073     my $newtask_results;
1074     do {
1075       $newtask_results = api_call(
1076         "job_tasks/list",
1077         'where' => {
1078           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1079         },
1080         'order' => 'qsequence',
1081         'offset' => scalar(@$newtask_list),
1082       );
1083       push(@$newtask_list, @{$newtask_results->{items}});
1084     } while (@{$newtask_results->{items}});
1085     foreach my $arvados_task (@$newtask_list) {
1086       my $jobstep = {
1087         'level' => $arvados_task->{'sequence'},
1088         'failures' => 0,
1089         'arvados_task' => $arvados_task
1090       };
1091       push @jobstep, $jobstep;
1092       push @jobstep_todo, $#jobstep;
1093     }
1094   }
1095
1096   $progress_is_dirty = 1;
1097   1;
1098 }
1099
1100 sub check_refresh_wanted
1101 {
1102   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1103   if (@stat && $stat[9] > $latest_refresh) {
1104     $latest_refresh = scalar time;
1105     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1106     for my $attr ('cancelled_at',
1107                   'cancelled_by_user_uuid',
1108                   'cancelled_by_client_uuid',
1109                   'state') {
1110       $Job->{$attr} = $Job2->{$attr};
1111     }
1112     if ($Job->{'state'} ne "Running") {
1113       if ($Job->{'state'} eq "Cancelled") {
1114         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1115       } else {
1116         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1117       }
1118       $main::success = 0;
1119       $main::please_freeze = 1;
1120     }
1121   }
1122 }
1123
1124 sub check_squeue
1125 {
1126   # return if the kill list was checked <4 seconds ago
1127   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1128   {
1129     return;
1130   }
1131   $squeue_kill_checked = time;
1132
1133   # use killem() on procs whose killtime is reached
1134   for (keys %proc)
1135   {
1136     if (exists $proc{$_}->{killtime}
1137         && $proc{$_}->{killtime} <= time)
1138     {
1139       killem ($_);
1140     }
1141   }
1142
1143   # return if the squeue was checked <60 seconds ago
1144   if (defined $squeue_checked && $squeue_checked > time - 60)
1145   {
1146     return;
1147   }
1148   $squeue_checked = time;
1149
1150   if (!$have_slurm)
1151   {
1152     # here is an opportunity to check for mysterious problems with local procs
1153     return;
1154   }
1155
1156   # get a list of steps still running
1157   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1158   chop @squeue;
1159   if ($squeue[-1] ne "ok")
1160   {
1161     return;
1162   }
1163   pop @squeue;
1164
1165   # which of my jobsteps are running, according to squeue?
1166   my %ok;
1167   foreach (@squeue)
1168   {
1169     if (/^(\d+)\.(\d+) (\S+)/)
1170     {
1171       if ($1 eq $ENV{SLURM_JOBID})
1172       {
1173         $ok{$3} = 1;
1174       }
1175     }
1176   }
1177
1178   # which of my active child procs (>60s old) were not mentioned by squeue?
1179   foreach (keys %proc)
1180   {
1181     if ($proc{$_}->{time} < time - 60
1182         && !exists $ok{$proc{$_}->{jobstepname}}
1183         && !exists $proc{$_}->{killtime})
1184     {
1185       # kill this proc if it hasn't exited in 30 seconds
1186       $proc{$_}->{killtime} = time + 30;
1187     }
1188   }
1189 }
1190
1191
1192 sub release_allocation
1193 {
1194   if ($have_slurm)
1195   {
1196     Log (undef, "release job allocation");
1197     system "scancel $ENV{SLURM_JOBID}";
1198   }
1199 }
1200
1201
1202 sub readfrompipes
1203 {
1204   my $gotsome = 0;
1205   foreach my $job (keys %reader)
1206   {
1207     my $buf;
1208     while (0 < sysread ($reader{$job}, $buf, 8192))
1209     {
1210       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1211       $jobstep[$job]->{stderr} .= $buf;
1212       preprocess_stderr ($job);
1213       if (length ($jobstep[$job]->{stderr}) > 16384)
1214       {
1215         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1216       }
1217       $gotsome = 1;
1218     }
1219   }
1220   return $gotsome;
1221 }
1222
1223
1224 sub preprocess_stderr
1225 {
1226   my $job = shift;
1227
1228   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1229     my $line = $1;
1230     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1231     Log ($job, "stderr $line");
1232     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1233       # whoa.
1234       $main::please_freeze = 1;
1235     }
1236     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure)/) {
1237       $jobstep[$job]->{node_fail} = 1;
1238       ban_node_by_slot($jobstep[$job]->{slotindex});
1239     }
1240   }
1241 }
1242
1243
1244 sub process_stderr
1245 {
1246   my $job = shift;
1247   my $task_success = shift;
1248   preprocess_stderr ($job);
1249
1250   map {
1251     Log ($job, "stderr $_");
1252   } split ("\n", $jobstep[$job]->{stderr});
1253 }
1254
1255 sub fetch_block
1256 {
1257   my $hash = shift;
1258   my $keep;
1259   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1260     Log(undef, "fetch_block run error from arv-get $hash: $!");
1261     return undef;
1262   }
1263   my $output_block = "";
1264   while (1) {
1265     my $buf;
1266     my $bytes = sysread($keep, $buf, 1024 * 1024);
1267     if (!defined $bytes) {
1268       Log(undef, "fetch_block read error from arv-get: $!");
1269       $output_block = undef;
1270       last;
1271     } elsif ($bytes == 0) {
1272       # sysread returns 0 at the end of the pipe.
1273       last;
1274     } else {
1275       # some bytes were read into buf.
1276       $output_block .= $buf;
1277     }
1278   }
1279   close $keep;
1280   if ($?) {
1281     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1282     $output_block = undef;
1283   }
1284   return $output_block;
1285 }
1286
1287 # Create a collection by concatenating the output of all tasks (each
1288 # task's output is either a manifest fragment, a locator for a
1289 # manifest fragment stored in Keep, or nothing at all). Return the
1290 # portable_data_hash of the new collection.
1291 sub create_output_collection
1292 {
1293   Log (undef, "collate");
1294
1295   my ($child_out, $child_in);
1296   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1297 import arvados
1298 import sys
1299 print (arvados.api("v1").collections().
1300        create(body={"manifest_text": sys.stdin.read()}).
1301        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1302 }, retry_count());
1303
1304   my $task_idx = -1;
1305   my $manifest_size = 0;
1306   for (@jobstep)
1307   {
1308     ++$task_idx;
1309     my $output = $_->{'arvados_task'}->{output};
1310     next if (!defined($output));
1311     my $next_write;
1312     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1313       $next_write = fetch_block($output);
1314     } else {
1315       $next_write = $output;
1316     }
1317     if (defined($next_write)) {
1318       if (!defined(syswrite($child_in, $next_write))) {
1319         # There's been an error writing.  Stop the loop.
1320         # We'll log details about the exit code later.
1321         last;
1322       } else {
1323         $manifest_size += length($next_write);
1324       }
1325     } else {
1326       my $uuid = $_->{'arvados_task'}->{'uuid'};
1327       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1328       $main::success = 0;
1329     }
1330   }
1331   close($child_in);
1332   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1333
1334   my $joboutput;
1335   my $s = IO::Select->new($child_out);
1336   if ($s->can_read(120)) {
1337     sysread($child_out, $joboutput, 1024 * 1024);
1338     waitpid($pid, 0);
1339     if ($?) {
1340       Log(undef, "output collection creation exited " . exit_status_s($?));
1341       $joboutput = undef;
1342     } else {
1343       chomp($joboutput);
1344     }
1345   } else {
1346     Log (undef, "timed out while creating output collection");
1347     foreach my $signal (2, 2, 2, 15, 15, 9) {
1348       kill($signal, $pid);
1349       last if waitpid($pid, WNOHANG) == -1;
1350       sleep(1);
1351     }
1352   }
1353   close($child_out);
1354
1355   return $joboutput;
1356 }
1357
1358
1359 sub killem
1360 {
1361   foreach (@_)
1362   {
1363     my $sig = 2;                # SIGINT first
1364     if (exists $proc{$_}->{"sent_$sig"} &&
1365         time - $proc{$_}->{"sent_$sig"} > 4)
1366     {
1367       $sig = 15;                # SIGTERM if SIGINT doesn't work
1368     }
1369     if (exists $proc{$_}->{"sent_$sig"} &&
1370         time - $proc{$_}->{"sent_$sig"} > 4)
1371     {
1372       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1373     }
1374     if (!exists $proc{$_}->{"sent_$sig"})
1375     {
1376       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1377       kill $sig, $_;
1378       select (undef, undef, undef, 0.1);
1379       if ($sig == 2)
1380       {
1381         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1382       }
1383       $proc{$_}->{"sent_$sig"} = time;
1384       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1385     }
1386   }
1387 }
1388
1389
1390 sub fhbits
1391 {
1392   my($bits);
1393   for (@_) {
1394     vec($bits,fileno($_),1) = 1;
1395   }
1396   $bits;
1397 }
1398
1399
1400 # Send log output to Keep via arv-put.
1401 #
1402 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1403 # $log_pipe_pid is the pid of the arv-put subprocess.
1404 #
1405 # The only functions that should access these variables directly are:
1406 #
1407 # log_writer_start($logfilename)
1408 #     Starts an arv-put pipe, reading data on stdin and writing it to
1409 #     a $logfilename file in an output collection.
1410 #
1411 # log_writer_send($txt)
1412 #     Writes $txt to the output log collection.
1413 #
1414 # log_writer_finish()
1415 #     Closes the arv-put pipe and returns the output that it produces.
1416 #
1417 # log_writer_is_active()
1418 #     Returns a true value if there is currently a live arv-put
1419 #     process, false otherwise.
1420 #
1421 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1422
1423 sub log_writer_start($)
1424 {
1425   my $logfilename = shift;
1426   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1427                         'arv-put',
1428                         '--portable-data-hash',
1429                         '--project-uuid', $Job->{owner_uuid},
1430                         '--retries', '3',
1431                         '--name', $logfilename,
1432                         '--filename', $logfilename,
1433                         '-');
1434 }
1435
1436 sub log_writer_send($)
1437 {
1438   my $txt = shift;
1439   print $log_pipe_in $txt;
1440 }
1441
1442 sub log_writer_finish()
1443 {
1444   return unless $log_pipe_pid;
1445
1446   close($log_pipe_in);
1447   my $arv_put_output;
1448
1449   my $s = IO::Select->new($log_pipe_out);
1450   if ($s->can_read(120)) {
1451     sysread($log_pipe_out, $arv_put_output, 1024);
1452     chomp($arv_put_output);
1453   } else {
1454     Log (undef, "timed out reading from 'arv-put'");
1455   }
1456
1457   waitpid($log_pipe_pid, 0);
1458   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1459   if ($?) {
1460     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1461   }
1462
1463   return $arv_put_output;
1464 }
1465
1466 sub log_writer_is_active() {
1467   return $log_pipe_pid;
1468 }
1469
1470 sub Log                         # ($jobstep_id, $logmessage)
1471 {
1472   if ($_[1] =~ /\n/) {
1473     for my $line (split (/\n/, $_[1])) {
1474       Log ($_[0], $line);
1475     }
1476     return;
1477   }
1478   my $fh = select STDERR; $|=1; select $fh;
1479   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1480   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1481   $message .= "\n";
1482   my $datetime;
1483   if (log_writer_is_active() || -t STDERR) {
1484     my @gmtime = gmtime;
1485     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1486                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1487   }
1488   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1489
1490   if (log_writer_is_active()) {
1491     log_writer_send($datetime . " " . $message);
1492   }
1493 }
1494
1495
1496 sub croak
1497 {
1498   my ($package, $file, $line) = caller;
1499   my $message = "@_ at $file line $line\n";
1500   Log (undef, $message);
1501   freeze() if @jobstep_todo;
1502   create_output_collection() if @jobstep_todo;
1503   cleanup();
1504   save_meta();
1505   die;
1506 }
1507
1508
1509 sub cleanup
1510 {
1511   return unless $Job;
1512   if ($Job->{'state'} eq 'Cancelled') {
1513     $Job->update_attributes('finished_at' => scalar gmtime);
1514   } else {
1515     $Job->update_attributes('state' => 'Failed');
1516   }
1517 }
1518
1519
1520 sub save_meta
1521 {
1522   my $justcheckpoint = shift; # false if this will be the last meta saved
1523   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1524   return unless log_writer_is_active();
1525
1526   my $loglocator = log_writer_finish();
1527   Log (undef, "log manifest is $loglocator");
1528   $Job->{'log'} = $loglocator;
1529   $Job->update_attributes('log', $loglocator);
1530 }
1531
1532
1533 sub freeze_if_want_freeze
1534 {
1535   if ($main::please_freeze)
1536   {
1537     release_allocation();
1538     if (@_)
1539     {
1540       # kill some srun procs before freeze+stop
1541       map { $proc{$_} = {} } @_;
1542       while (%proc)
1543       {
1544         killem (keys %proc);
1545         select (undef, undef, undef, 0.1);
1546         my $died;
1547         while (($died = waitpid (-1, WNOHANG)) > 0)
1548         {
1549           delete $proc{$died};
1550         }
1551       }
1552     }
1553     freeze();
1554     create_output_collection();
1555     cleanup();
1556     save_meta();
1557     exit 1;
1558   }
1559 }
1560
1561
1562 sub freeze
1563 {
1564   Log (undef, "Freeze not implemented");
1565   return;
1566 }
1567
1568
1569 sub thaw
1570 {
1571   croak ("Thaw not implemented");
1572 }
1573
1574
1575 sub freezequote
1576 {
1577   my $s = shift;
1578   $s =~ s/\\/\\\\/g;
1579   $s =~ s/\n/\\n/g;
1580   return $s;
1581 }
1582
1583
1584 sub freezeunquote
1585 {
1586   my $s = shift;
1587   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1588   return $s;
1589 }
1590
1591
1592 sub srun
1593 {
1594   my $srunargs = shift;
1595   my $execargs = shift;
1596   my $opts = shift || {};
1597   my $stdin = shift;
1598   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1599
1600   $Data::Dumper::Terse = 1;
1601   $Data::Dumper::Indent = 0;
1602   my $show_cmd = Dumper($args);
1603   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1604   $show_cmd =~ s/\n/ /g;
1605   warn "starting: $show_cmd\n";
1606
1607   if (defined $stdin) {
1608     my $child = open STDIN, "-|";
1609     defined $child or die "no fork: $!";
1610     if ($child == 0) {
1611       print $stdin or die $!;
1612       close STDOUT or die $!;
1613       exit 0;
1614     }
1615   }
1616
1617   return system (@$args) if $opts->{fork};
1618
1619   exec @$args;
1620   warn "ENV size is ".length(join(" ",%ENV));
1621   die "exec failed: $!: @$args";
1622 }
1623
1624
1625 sub ban_node_by_slot {
1626   # Don't start any new jobsteps on this node for 60 seconds
1627   my $slotid = shift;
1628   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1629   $slot[$slotid]->{node}->{hold_count}++;
1630   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1631 }
1632
1633 sub must_lock_now
1634 {
1635   my ($lockfile, $error_message) = @_;
1636   open L, ">", $lockfile or croak("$lockfile: $!");
1637   if (!flock L, LOCK_EX|LOCK_NB) {
1638     croak("Can't lock $lockfile: $error_message\n");
1639   }
1640 }
1641
1642 sub find_docker_image {
1643   # Given a Keep locator, check to see if it contains a Docker image.
1644   # If so, return its stream name and Docker hash.
1645   # If not, return undef for both values.
1646   my $locator = shift;
1647   my ($streamname, $filename);
1648   my $image = api_call("collections/get", uuid => $locator);
1649   if ($image) {
1650     foreach my $line (split(/\n/, $image->{manifest_text})) {
1651       my @tokens = split(/\s+/, $line);
1652       next if (!@tokens);
1653       $streamname = shift(@tokens);
1654       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1655         if (defined($filename)) {
1656           return (undef, undef);  # More than one file in the Collection.
1657         } else {
1658           $filename = (split(/:/, $filedata, 3))[2];
1659         }
1660       }
1661     }
1662   }
1663   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1664     return ($streamname, $1);
1665   } else {
1666     return (undef, undef);
1667   }
1668 }
1669
1670 sub retry_count {
1671   # Calculate the number of times an operation should be retried,
1672   # assuming exponential backoff, and that we're willing to retry as
1673   # long as tasks have been running.  Enforce a minimum of 3 retries.
1674   my ($starttime, $endtime, $timediff, $retries);
1675   if (@jobstep) {
1676     $starttime = $jobstep[0]->{starttime};
1677     $endtime = $jobstep[-1]->{finishtime};
1678   }
1679   if (!defined($starttime)) {
1680     $timediff = 0;
1681   } elsif (!defined($endtime)) {
1682     $timediff = time - $starttime;
1683   } else {
1684     $timediff = ($endtime - $starttime) - (time - $endtime);
1685   }
1686   if ($timediff > 0) {
1687     $retries = int(log($timediff) / log(2));
1688   } else {
1689     $retries = 1;  # Use the minimum.
1690   }
1691   return ($retries > 3) ? $retries : 3;
1692 }
1693
1694 sub retry_op {
1695   # Pass in two function references.
1696   # This method will be called with the remaining arguments.
1697   # If it dies, retry it with exponential backoff until it succeeds,
1698   # or until the current retry_count is exhausted.  After each failure
1699   # that can be retried, the second function will be called with
1700   # the current try count (0-based), next try time, and error message.
1701   my $operation = shift;
1702   my $retry_callback = shift;
1703   my $retries = retry_count();
1704   foreach my $try_count (0..$retries) {
1705     my $next_try = time + (2 ** $try_count);
1706     my $result = eval { $operation->(@_); };
1707     if (!$@) {
1708       return $result;
1709     } elsif ($try_count < $retries) {
1710       $retry_callback->($try_count, $next_try, $@);
1711       my $sleep_time = $next_try - time;
1712       sleep($sleep_time) if ($sleep_time > 0);
1713     }
1714   }
1715   # Ensure the error message ends in a newline, so Perl doesn't add
1716   # retry_op's line number to it.
1717   chomp($@);
1718   die($@ . "\n");
1719 }
1720
1721 sub api_call {
1722   # Pass in a /-separated API method name, and arguments for it.
1723   # This function will call that method, retrying as needed until
1724   # the current retry_count is exhausted, with a log on the first failure.
1725   my $method_name = shift;
1726   my $log_api_retry = sub {
1727     my ($try_count, $next_try_at, $errmsg) = @_;
1728     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1729     $errmsg =~ s/\s/ /g;
1730     $errmsg =~ s/\s+$//;
1731     my $retry_msg;
1732     if ($next_try_at < time) {
1733       $retry_msg = "Retrying.";
1734     } else {
1735       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1736       $retry_msg = "Retrying at $next_try_fmt.";
1737     }
1738     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1739   };
1740   my $method = $arv;
1741   foreach my $key (split(/\//, $method_name)) {
1742     $method = $method->{$key};
1743   }
1744   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1745 }
1746
1747 sub exit_status_s {
1748   # Given a $?, return a human-readable exit code string like "0" or
1749   # "1" or "0 with signal 1" or "1 with signal 11".
1750   my $exitcode = shift;
1751   my $s = $exitcode >> 8;
1752   if ($exitcode & 0x7f) {
1753     $s .= " with signal " . ($exitcode & 0x7f);
1754   }
1755   if ($exitcode & 0x80) {
1756     $s .= " with core dump";
1757   }
1758   return $s;
1759 }
1760
1761 sub handle_readall {
1762   # Pass in a glob reference to a file handle.
1763   # Read all its contents and return them as a string.
1764   my $fh_glob_ref = shift;
1765   local $/ = undef;
1766   return <$fh_glob_ref>;
1767 }
1768
1769 sub tar_filename_n {
1770   my $n = shift;
1771   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1772 }
1773
1774 sub add_git_archive {
1775   # Pass in a git archive command as a string or list, a la system().
1776   # This method will save its output to be included in the archive sent to the
1777   # build script.
1778   my $git_input;
1779   $git_tar_count++;
1780   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1781     croak("Failed to save git archive: $!");
1782   }
1783   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1784   close($git_input);
1785   waitpid($git_pid, 0);
1786   close(GIT_ARCHIVE);
1787   if ($?) {
1788     croak("Failed to save git archive: git exited " . exit_status_s($?));
1789   }
1790 }
1791
1792 sub combined_git_archive {
1793   # Combine all saved tar archives into a single archive, then return its
1794   # contents in a string.  Return undef if no archives have been saved.
1795   if ($git_tar_count < 1) {
1796     return undef;
1797   }
1798   my $base_tar_name = tar_filename_n(1);
1799   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1800     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1801     if ($tar_exit != 0) {
1802       croak("Error preparing build archive: tar -A exited " .
1803             exit_status_s($tar_exit));
1804     }
1805   }
1806   if (!open(GIT_TAR, "<", $base_tar_name)) {
1807     croak("Could not open build archive: $!");
1808   }
1809   my $tar_contents = handle_readall(\*GIT_TAR);
1810   close(GIT_TAR);
1811   return $tar_contents;
1812 }
1813
1814 __DATA__
1815 #!/usr/bin/perl
1816 #
1817 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1818 # server invokes this script on individual compute nodes, or localhost if we're
1819 # running a job locally.  It gets called in two modes:
1820 #
1821 # * No arguments: Installation mode.  Read a tar archive from the DATA
1822 #   file handle; it includes the Crunch script's source code, and
1823 #   maybe SDKs as well.  Those should be installed in the proper
1824 #   locations.  This runs outside of any Docker container, so don't try to
1825 #   introspect Crunch's runtime environment.
1826 #
1827 # * With arguments: Crunch script run mode.  This script should set up the
1828 #   environment, then run the command specified in the arguments.  This runs
1829 #   inside any Docker container.
1830
1831 use Fcntl ':flock';
1832 use File::Path qw( make_path remove_tree );
1833 use POSIX qw(getcwd);
1834
1835 use constant TASK_TEMPFAIL => 111;
1836
1837 # Map SDK subdirectories to the path environments they belong to.
1838 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1839
1840 my $destdir = $ENV{"CRUNCH_SRC"};
1841 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1842 my $repo = $ENV{"CRUNCH_SRC_URL"};
1843 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1844 my $job_work = $ENV{"JOB_WORK"};
1845 my $task_work = $ENV{"TASK_WORK"};
1846
1847 for my $dir ($destdir, $job_work, $task_work) {
1848   if ($dir) {
1849     make_path $dir;
1850     -e $dir or die "Failed to create temporary directory ($dir): $!";
1851   }
1852 }
1853
1854 if ($task_work) {
1855   remove_tree($task_work, {keep_root => 1});
1856 }
1857
1858 open(STDOUT_ORIG, ">&", STDOUT);
1859 open(STDERR_ORIG, ">&", STDERR);
1860 open(STDOUT, ">>", "$destdir.log");
1861 open(STDERR, ">&", STDOUT);
1862
1863 ### Crunch script run mode
1864 if (@ARGV) {
1865   # We want to do routine logging during task 0 only.  This gives the user
1866   # the information they need, but avoids repeating the information for every
1867   # task.
1868   my $Log;
1869   if ($ENV{TASK_SEQUENCE} eq "0") {
1870     $Log = sub {
1871       my $msg = shift;
1872       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1873     };
1874   } else {
1875     $Log = sub { };
1876   }
1877
1878   my $python_src = "$install_dir/python";
1879   my $venv_dir = "$job_work/.arvados.venv";
1880   my $venv_built = -e "$venv_dir/bin/activate";
1881   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1882     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1883                  "--python=python2.7", $venv_dir);
1884     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1885     $venv_built = 1;
1886     $Log->("Built Python SDK virtualenv");
1887   }
1888
1889   my $pip_bin = "pip";
1890   if ($venv_built) {
1891     $Log->("Running in Python SDK virtualenv");
1892     $pip_bin = "$venv_dir/bin/pip";
1893     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1894     @ARGV = ("/bin/sh", "-ec",
1895              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1896   } elsif (-d $python_src) {
1897     $Log->("Warning: virtualenv not found inside Docker container default " .
1898            "\$PATH. Can't install Python SDK.");
1899   }
1900
1901   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1902   if ($pkgs) {
1903     $Log->("Using Arvados SDK:");
1904     foreach my $line (split /\n/, $pkgs) {
1905       $Log->($line);
1906     }
1907   } else {
1908     $Log->("Arvados SDK packages not found");
1909   }
1910
1911   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1912     my $sdk_path = "$install_dir/$sdk_dir";
1913     if (-d $sdk_path) {
1914       if ($ENV{$sdk_envkey}) {
1915         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1916       } else {
1917         $ENV{$sdk_envkey} = $sdk_path;
1918       }
1919       $Log->("Arvados SDK added to %s", $sdk_envkey);
1920     }
1921   }
1922
1923   close(STDOUT);
1924   close(STDERR);
1925   open(STDOUT, ">&", STDOUT_ORIG);
1926   open(STDERR, ">&", STDERR_ORIG);
1927   exec(@ARGV);
1928   die "Cannot exec `@ARGV`: $!";
1929 }
1930
1931 ### Installation mode
1932 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1933 flock L, LOCK_EX;
1934 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1935   # This version already installed -> nothing to do.
1936   exit(0);
1937 }
1938
1939 unlink "$destdir.commit";
1940 mkdir $destdir;
1941
1942 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1943   die "Error launching 'tar -xC $destdir': $!";
1944 }
1945 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1946 # get SIGPIPE.  We must feed it data incrementally.
1947 my $tar_input;
1948 while (read(DATA, $tar_input, 65536)) {
1949   print TARX $tar_input;
1950 }
1951 if(!close(TARX)) {
1952   die "'tar -xC $destdir' exited $?: $!";
1953 }
1954
1955 mkdir $install_dir;
1956
1957 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1958 if (-d $sdk_root) {
1959   foreach my $sdk_lang (("python",
1960                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1961     if (-d "$sdk_root/$sdk_lang") {
1962       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1963         die "Failed to install $sdk_lang SDK: $!";
1964       }
1965     }
1966   }
1967 }
1968
1969 my $python_dir = "$install_dir/python";
1970 if ((-d $python_dir) and can_run("python2.7") and
1971     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1972   # egg_info failed, probably when it asked git for a build tag.
1973   # Specify no build tag.
1974   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1975   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1976   close($pysdk_cfg);
1977 }
1978
1979 if (-e "$destdir/crunch_scripts/install") {
1980     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
1981 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1982     # Old version
1983     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
1984 } elsif (-e "./install.sh") {
1985     shell_or_die (undef, "./install.sh", $install_dir);
1986 }
1987
1988 if ($commit) {
1989     unlink "$destdir.commit.new";
1990     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1991     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1992 }
1993
1994 close L;
1995
1996 sub can_run {
1997   my $command_name = shift;
1998   open(my $which, "-|", "which", $command_name);
1999   while (<$which>) { }
2000   close($which);
2001   return ($? == 0);
2002 }
2003
2004 sub shell_or_die
2005 {
2006   my $exitcode = shift;
2007
2008   if ($ENV{"DEBUG"}) {
2009     print STDERR "@_\n";
2010   }
2011   if (system (@_) != 0) {
2012     my $err = $!;
2013     my $code = $?;
2014     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2015     open STDERR, ">&STDERR_ORIG";
2016     system ("cat $destdir.log >&2");
2017     warn "@_ failed ($err): $exitstatus";
2018     if (defined($exitcode)) {
2019       exit $exitcode;
2020     }
2021     else {
2022       exit (($code >> 8) || 1);
2023     }
2024   }
2025 }
2026
2027 __DATA__