5448: Add --all to "docker images" because it turns out it doesn't list all the insta...
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant EX_TEMPFAIL => 75;
100
101 $ENV{"TMPDIR"} ||= "/tmp";
102 unless (defined $ENV{"CRUNCH_TMP"}) {
103   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
104   if ($ENV{"USER"} ne "crunch" && $< != 0) {
105     # use a tmp dir unique for my uid
106     $ENV{"CRUNCH_TMP"} .= "-$<";
107   }
108 }
109
110 # Create the tmp directory if it does not exist
111 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
112   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
113 }
114
115 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
116 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
117 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
118 mkdir ($ENV{"JOB_WORK"});
119
120 my $force_unlock;
121 my $git_dir;
122 my $jobspec;
123 my $job_api_token;
124 my $no_clear_tmp;
125 my $resume_stash;
126 GetOptions('force-unlock' => \$force_unlock,
127            'git-dir=s' => \$git_dir,
128            'job=s' => \$jobspec,
129            'job-api-token=s' => \$job_api_token,
130            'no-clear-tmp' => \$no_clear_tmp,
131            'resume-stash=s' => \$resume_stash,
132     );
133
134 if (defined $job_api_token) {
135   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
136 }
137
138 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
139 my $local_job = 0;
140
141
142 $SIG{'USR1'} = sub
143 {
144   $main::ENV{CRUNCH_DEBUG} = 1;
145 };
146 $SIG{'USR2'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 0;
149 };
150
151
152
153 my $arv = Arvados->new('apiVersion' => 'v1');
154
155 my $Job;
156 my $job_id;
157 my $dbh;
158 my $sth;
159 my @jobstep;
160
161 my $User = api_call("users/current");
162
163 if ($jobspec =~ /^[-a-z\d]+$/)
164 {
165   # $jobspec is an Arvados UUID, not a JSON job specification
166   $Job = api_call("jobs/get", uuid => $jobspec);
167   if (!$force_unlock) {
168     # Claim this job, and make sure nobody else does
169     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
170     if ($@) {
171       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
172       exit EX_TEMPFAIL;
173     };
174   }
175 }
176 else
177 {
178   $Job = JSON::decode_json($jobspec);
179
180   if (!$resume_stash)
181   {
182     map { croak ("No $_ specified") unless $Job->{$_} }
183     qw(script script_version script_parameters);
184   }
185
186   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
187   $Job->{'started_at'} = gmtime;
188   $Job->{'state'} = 'Running';
189
190   $Job = api_call("jobs/create", job => $Job);
191 }
192 $job_id = $Job->{'uuid'};
193
194 my $keep_logfile = $job_id . '.log.txt';
195 log_writer_start($keep_logfile);
196
197 $Job->{'runtime_constraints'} ||= {};
198 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
199 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
200
201 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
202 if ($? == 0) {
203   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
204   chomp($gem_versions);
205   chop($gem_versions);  # Closing parentheses
206 } else {
207   $gem_versions = "";
208 }
209 Log(undef,
210     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
211
212 Log (undef, "check slurm allocation");
213 my @slot;
214 my @node;
215 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
216 my @sinfo;
217 if (!$have_slurm)
218 {
219   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
220   push @sinfo, "$localcpus localhost";
221 }
222 if (exists $ENV{SLURM_NODELIST})
223 {
224   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
225 }
226 foreach (@sinfo)
227 {
228   my ($ncpus, $slurm_nodelist) = split;
229   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
230
231   my @nodelist;
232   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
233   {
234     my $nodelist = $1;
235     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
236     {
237       my $ranges = $1;
238       foreach (split (",", $ranges))
239       {
240         my ($a, $b);
241         if (/(\d+)-(\d+)/)
242         {
243           $a = $1;
244           $b = $2;
245         }
246         else
247         {
248           $a = $_;
249           $b = $_;
250         }
251         push @nodelist, map {
252           my $n = $nodelist;
253           $n =~ s/\[[-,\d]+\]/$_/;
254           $n;
255         } ($a..$b);
256       }
257     }
258     else
259     {
260       push @nodelist, $nodelist;
261     }
262   }
263   foreach my $nodename (@nodelist)
264   {
265     Log (undef, "node $nodename - $ncpus slots");
266     my $node = { name => $nodename,
267                  ncpus => $ncpus,
268                  losing_streak => 0,
269                  hold_until => 0 };
270     foreach my $cpu (1..$ncpus)
271     {
272       push @slot, { node => $node,
273                     cpu => $cpu };
274     }
275   }
276   push @node, @nodelist;
277 }
278
279
280
281 # Ensure that we get one jobstep running on each allocated node before
282 # we start overloading nodes with concurrent steps
283
284 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
285
286
287 $Job->update_attributes(
288   'tasks_summary' => { 'failed' => 0,
289                        'todo' => 1,
290                        'running' => 0,
291                        'done' => 0 });
292
293 Log (undef, "start");
294 $SIG{'INT'} = sub { $main::please_freeze = 1; };
295 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
296 $SIG{'TERM'} = \&croak;
297 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
298 $SIG{'ALRM'} = sub { $main::please_info = 1; };
299 $SIG{'CONT'} = sub { $main::please_continue = 1; };
300 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
301
302 $main::please_freeze = 0;
303 $main::please_info = 0;
304 $main::please_continue = 0;
305 $main::please_refresh = 0;
306 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
307
308 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
309 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
310 $ENV{"JOB_UUID"} = $job_id;
311
312
313 my @jobstep_todo = ();
314 my @jobstep_done = ();
315 my @jobstep_tomerge = ();
316 my $jobstep_tomerge_level = 0;
317 my $squeue_checked;
318 my $squeue_kill_checked;
319 my $latest_refresh = scalar time;
320
321
322
323 if (defined $Job->{thawedfromkey})
324 {
325   thaw ($Job->{thawedfromkey});
326 }
327 else
328 {
329   my $first_task = api_call("job_tasks/create", job_task => {
330     'job_uuid' => $Job->{'uuid'},
331     'sequence' => 0,
332     'qsequence' => 0,
333     'parameters' => {},
334   });
335   push @jobstep, { 'level' => 0,
336                    'failures' => 0,
337                    'arvados_task' => $first_task,
338                  };
339   push @jobstep_todo, 0;
340 }
341
342
343 if (!$have_slurm)
344 {
345   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
346 }
347
348 my $build_script = handle_readall(\*DATA);
349 my $nodelist = join(",", @node);
350 my $git_tar_count = 0;
351
352 if (!defined $no_clear_tmp) {
353   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
354   Log (undef, "Clean work dirs");
355
356   my $cleanpid = fork();
357   if ($cleanpid == 0)
358   {
359     # Find FUSE mounts that look like Keep mounts (the mount path has the
360     # word "keep") and unmount them.  Then clean up work directories.
361     # TODO: When #5036 is done and widely deployed, we can get rid of the
362     # regular expression and just unmount everything with type fuse.keep.
363     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
364           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
365     exit (1);
366   }
367   while (1)
368   {
369     last if $cleanpid == waitpid (-1, WNOHANG);
370     freeze_if_want_freeze ($cleanpid);
371     select (undef, undef, undef, 0.1);
372   }
373   Log (undef, "Cleanup command exited ".exit_status_s($?));
374 }
375
376 # If this job requires a Docker image, install that.
377 my $docker_bin = "/usr/bin/docker.io";
378 my ($docker_locator, $docker_stream, $docker_hash);
379 if ($docker_locator = $Job->{docker_image_locator}) {
380   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
381   if (!$docker_hash)
382   {
383     croak("No Docker image hash found from locator $docker_locator");
384   }
385   $docker_stream =~ s/^\.//;
386   my $docker_install_script = qq{
387 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
388     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
389 fi
390 };
391   my $docker_pid = fork();
392   if ($docker_pid == 0)
393   {
394     srun (["srun", "--nodelist=" . join(',', @node)],
395           ["/bin/sh", "-ec", $docker_install_script]);
396     exit ($?);
397   }
398   while (1)
399   {
400     last if $docker_pid == waitpid (-1, WNOHANG);
401     freeze_if_want_freeze ($docker_pid);
402     select (undef, undef, undef, 0.1);
403   }
404   if ($? != 0)
405   {
406     croak("Installing Docker image from $docker_locator exited "
407           .exit_status_s($?));
408   }
409
410   if ($Job->{arvados_sdk_version}) {
411     # The job also specifies an Arvados SDK version.  Add the SDKs to the
412     # tar file for the build script to install.
413     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
414                        $Job->{arvados_sdk_version}));
415     add_git_archive("git", "--git-dir=$git_dir", "archive",
416                     "--prefix=.arvados.sdk/",
417                     $Job->{arvados_sdk_version}, "sdk");
418   }
419 }
420
421 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
422   # If script_version looks like an absolute path, *and* the --git-dir
423   # argument was not given -- which implies we were not invoked by
424   # crunch-dispatch -- we will use the given path as a working
425   # directory instead of resolving script_version to a git commit (or
426   # doing anything else with git).
427   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
428   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
429 }
430 else {
431   # Resolve the given script_version to a git commit sha1. Also, if
432   # the repository is remote, clone it into our local filesystem: this
433   # ensures "git archive" will work, and is necessary to reliably
434   # resolve a symbolic script_version like "master^".
435   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
436
437   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
438
439   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
440
441   # If we're running under crunch-dispatch, it will have already
442   # pulled the appropriate source tree into its own repository, and
443   # given us that repo's path as $git_dir.
444   #
445   # If we're running a "local" job, we might have to fetch content
446   # from a remote repository.
447   #
448   # (Currently crunch-dispatch gives a local path with --git-dir, but
449   # we might as well accept URLs there too in case it changes its
450   # mind.)
451   my $repo = $git_dir || $Job->{'repository'};
452
453   # Repository can be remote or local. If remote, we'll need to fetch it
454   # to a local dir before doing `git log` et al.
455   my $repo_location;
456
457   if ($repo =~ m{://|^[^/]*:}) {
458     # $repo is a git url we can clone, like git:// or https:// or
459     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
460     # not recognized here because distinguishing that from a local
461     # path is too fragile. If you really need something strange here,
462     # use the ssh:// form.
463     $repo_location = 'remote';
464   } elsif ($repo =~ m{^\.*/}) {
465     # $repo is a local path to a git index. We'll also resolve ../foo
466     # to ../foo/.git if the latter is a directory. To help
467     # disambiguate local paths from named hosted repositories, this
468     # form must be given as ./ or ../ if it's a relative path.
469     if (-d "$repo/.git") {
470       $repo = "$repo/.git";
471     }
472     $repo_location = 'local';
473   } else {
474     # $repo is none of the above. It must be the name of a hosted
475     # repository.
476     my $arv_repo_list = api_call("repositories/list",
477                                  'filters' => [['name','=',$repo]]);
478     my @repos_found = @{$arv_repo_list->{'items'}};
479     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
480     if ($n_found > 0) {
481       Log(undef, "Repository '$repo' -> "
482           . join(", ", map { $_->{'uuid'} } @repos_found));
483     }
484     if ($n_found != 1) {
485       croak("Error: Found $n_found repositories with name '$repo'.");
486     }
487     $repo = $repos_found[0]->{'fetch_url'};
488     $repo_location = 'remote';
489   }
490   Log(undef, "Using $repo_location repository '$repo'");
491   $ENV{"CRUNCH_SRC_URL"} = $repo;
492
493   # Resolve given script_version (we'll call that $treeish here) to a
494   # commit sha1 ($commit).
495   my $treeish = $Job->{'script_version'};
496   my $commit;
497   if ($repo_location eq 'remote') {
498     # We minimize excess object-fetching by re-using the same bare
499     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
500     # just keep adding remotes to it as needed.
501     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
502     my $gitcmd = "git --git-dir=\Q$local_repo\E";
503
504     # Set up our local repo for caching remote objects, making
505     # archives, etc.
506     if (!-d $local_repo) {
507       make_path($local_repo) or croak("Error: could not create $local_repo");
508     }
509     # This works (exits 0 and doesn't delete fetched objects) even
510     # if $local_repo is already initialized:
511     `$gitcmd init --bare`;
512     if ($?) {
513       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
514     }
515
516     # If $treeish looks like a hash (or abbrev hash) we look it up in
517     # our local cache first, since that's cheaper. (We don't want to
518     # do that with tags/branches though -- those change over time, so
519     # they should always be resolved by the remote repo.)
520     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
521       # Hide stderr because it's normal for this to fail:
522       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
523       if ($? == 0 &&
524           # Careful not to resolve a branch named abcdeff to commit 1234567:
525           $sha1 =~ /^$treeish/ &&
526           $sha1 =~ /^([0-9a-f]{40})$/s) {
527         $commit = $1;
528         Log(undef, "Commit $commit already present in $local_repo");
529       }
530     }
531
532     if (!defined $commit) {
533       # If $treeish isn't just a hash or abbrev hash, or isn't here
534       # yet, we need to fetch the remote to resolve it correctly.
535
536       # First, remove all local heads. This prevents a name that does
537       # not exist on the remote from resolving to (or colliding with)
538       # a previously fetched branch or tag (possibly from a different
539       # remote).
540       remove_tree("$local_repo/refs/heads", {keep_root => 1});
541
542       Log(undef, "Fetching objects from $repo to $local_repo");
543       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
544       if ($?) {
545         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
546       }
547     }
548
549     # Now that the data is all here, we will use our local repo for
550     # the rest of our git activities.
551     $repo = $local_repo;
552   }
553
554   my $gitcmd = "git --git-dir=\Q$repo\E";
555   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
556   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
557     croak("`$gitcmd rev-list` exited "
558           .exit_status_s($?)
559           .", '$treeish' not found. Giving up.");
560   }
561   $commit = $1;
562   Log(undef, "Version $treeish is commit $commit");
563
564   if ($commit ne $Job->{'script_version'}) {
565     # Record the real commit id in the database, frozentokey, logs,
566     # etc. -- instead of an abbreviation or a branch name which can
567     # become ambiguous or point to a different commit in the future.
568     if (!$Job->update_attributes('script_version' => $commit)) {
569       croak("Error: failed to update job's script_version attribute");
570     }
571   }
572
573   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
574   add_git_archive("$gitcmd archive ''\Q$commit\E");
575 }
576
577 my $git_archive = combined_git_archive();
578 if (!defined $git_archive) {
579   Log(undef, "Skip install phase (no git archive)");
580   if ($have_slurm) {
581     Log(undef, "Warning: This probably means workers have no source tree!");
582   }
583 }
584 else {
585   Log(undef, "Run install script on all workers");
586
587   my @srunargs = ("srun",
588                   "--nodelist=$nodelist",
589                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
590   my @execargs = ("sh", "-c",
591                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
592
593   my $installpid = fork();
594   if ($installpid == 0)
595   {
596     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
597     exit (1);
598   }
599   while (1)
600   {
601     last if $installpid == waitpid (-1, WNOHANG);
602     freeze_if_want_freeze ($installpid);
603     select (undef, undef, undef, 0.1);
604   }
605   my $install_exited = $?;
606   Log (undef, "Install script exited ".exit_status_s($install_exited));
607   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
608     unlink($tar_filename);
609   }
610   exit (1) if $install_exited != 0;
611 }
612
613 foreach (qw (script script_version script_parameters runtime_constraints))
614 {
615   Log (undef,
616        "$_ " .
617        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
618 }
619 foreach (split (/\n/, $Job->{knobs}))
620 {
621   Log (undef, "knob " . $_);
622 }
623
624
625
626 $main::success = undef;
627
628
629
630 ONELEVEL:
631
632 my $thisround_succeeded = 0;
633 my $thisround_failed = 0;
634 my $thisround_failed_multiple = 0;
635
636 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
637                        or $a <=> $b } @jobstep_todo;
638 my $level = $jobstep[$jobstep_todo[0]]->{level};
639 Log (undef, "start level $level");
640
641
642
643 my %proc;
644 my @freeslot = (0..$#slot);
645 my @holdslot;
646 my %reader;
647 my $progress_is_dirty = 1;
648 my $progress_stats_updated = 0;
649
650 update_progress_stats();
651
652
653
654 THISROUND:
655 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
656 {
657   my $id = $jobstep_todo[$todo_ptr];
658   my $Jobstep = $jobstep[$id];
659   if ($Jobstep->{level} != $level)
660   {
661     next;
662   }
663
664   pipe $reader{$id}, "writer" or croak ($!);
665   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
666   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
667
668   my $childslot = $freeslot[0];
669   my $childnode = $slot[$childslot]->{node};
670   my $childslotname = join (".",
671                             $slot[$childslot]->{node}->{name},
672                             $slot[$childslot]->{cpu});
673   my $childpid = fork();
674   if ($childpid == 0)
675   {
676     $SIG{'INT'} = 'DEFAULT';
677     $SIG{'QUIT'} = 'DEFAULT';
678     $SIG{'TERM'} = 'DEFAULT';
679
680     foreach (values (%reader))
681     {
682       close($_);
683     }
684     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
685     open(STDOUT,">&writer");
686     open(STDERR,">&writer");
687
688     undef $dbh;
689     undef $sth;
690
691     delete $ENV{"GNUPGHOME"};
692     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
693     $ENV{"TASK_QSEQUENCE"} = $id;
694     $ENV{"TASK_SEQUENCE"} = $level;
695     $ENV{"JOB_SCRIPT"} = $Job->{script};
696     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
697       $param =~ tr/a-z/A-Z/;
698       $ENV{"JOB_PARAMETER_$param"} = $value;
699     }
700     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
701     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
702     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
703     $ENV{"HOME"} = $ENV{"TASK_WORK"};
704     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
705     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
706     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
707     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
708
709     $ENV{"GZIP"} = "-n";
710
711     my @srunargs = (
712       "srun",
713       "--nodelist=".$childnode->{name},
714       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
715       "--job-name=$job_id.$id.$$",
716         );
717     my $command =
718         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
719         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
720         ."&& cd $ENV{CRUNCH_TMP} ";
721     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
722     if ($docker_hash)
723     {
724       $Jobstep->{cidfile} = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}-$Jobstep->{failures}.cid";
725       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$Jobstep->{cidfile} -poll=10000 ";
726       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$Jobstep->{cidfile} --sig-proxy ";
727
728       # Dynamically configure the container to use the host system as its
729       # DNS server.  Get the host's global addresses from the ip command,
730       # and turn them into docker --dns options using gawk.
731       $command .=
732           q{$(ip -o address show scope global |
733               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
734
735       # The source tree and $destdir directory (which we have
736       # installed on the worker host) are available in the container,
737       # under the same path.
738       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
739       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
740
741       # Currently, we make arv-mount's mount point appear at /keep
742       # inside the container (instead of using the same path as the
743       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
744       # crunch scripts and utilities must not rely on this. They must
745       # use $TASK_KEEPMOUNT.
746       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
747       $ENV{TASK_KEEPMOUNT} = "/keep";
748
749       # TASK_WORK is almost exactly like a docker data volume: it
750       # starts out empty, is writable, and persists until no
751       # containers use it any more. We don't use --volumes-from to
752       # share it with other containers: it is only accessible to this
753       # task, and it goes away when this task stops.
754       #
755       # However, a docker data volume is writable only by root unless
756       # the mount point already happens to exist in the container with
757       # different permissions. Therefore, we [1] assume /tmp already
758       # exists in the image and is writable by the crunch user; [2]
759       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
760       # writable if they are created by docker while setting up the
761       # other --volumes); and [3] create $TASK_WORK inside the
762       # container using $build_script.
763       $command .= "--volume=/tmp ";
764       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
765       $ENV{"HOME"} = $ENV{"TASK_WORK"};
766       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
767
768       # TODO: Share a single JOB_WORK volume across all task
769       # containers on a given worker node, and delete it when the job
770       # ends (and, in case that doesn't work, when the next job
771       # starts).
772       #
773       # For now, use the same approach as TASK_WORK above.
774       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
775
776       while (my ($env_key, $env_val) = each %ENV)
777       {
778         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
779           $command .= "--env=\Q$env_key=$env_val\E ";
780         }
781       }
782       $command .= "--env=\QHOME=$ENV{HOME}\E ";
783       $command .= "\Q$docker_hash\E ";
784       $command .= "stdbuf --output=0 --error=0 ";
785       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
786     } else {
787       # Non-docker run
788       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
789       $command .= "stdbuf --output=0 --error=0 ";
790       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
791     }
792
793     my @execargs = ('bash', '-c', $command);
794     srun (\@srunargs, \@execargs, undef, $build_script);
795     # exec() failed, we assume nothing happened.
796     die "srun() failed on build script\n";
797   }
798   close("writer");
799   if (!defined $childpid)
800   {
801     close $reader{$id};
802     delete $reader{$id};
803     next;
804   }
805   shift @freeslot;
806   $proc{$childpid} = { jobstep => $id,
807                        time => time,
808                        slot => $childslot,
809                        jobstepname => "$job_id.$id.$childpid",
810                      };
811   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
812   $slot[$childslot]->{pid} = $childpid;
813
814   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
815   Log ($id, "child $childpid started on $childslotname");
816   $Jobstep->{starttime} = time;
817   $Jobstep->{node} = $childnode->{name};
818   $Jobstep->{slotindex} = $childslot;
819   delete $Jobstep->{stderr};
820   delete $Jobstep->{finishtime};
821
822   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
823   $Jobstep->{'arvados_task'}->save;
824
825   splice @jobstep_todo, $todo_ptr, 1;
826   --$todo_ptr;
827
828   $progress_is_dirty = 1;
829
830   while (!@freeslot
831          ||
832          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
833   {
834     last THISROUND if $main::please_freeze || defined($main::success);
835     if ($main::please_info)
836     {
837       $main::please_info = 0;
838       freeze();
839       create_output_collection();
840       save_meta(1);
841       update_progress_stats();
842     }
843     my $gotsome
844         = readfrompipes ()
845         + reapchildren ();
846     if (!$gotsome)
847     {
848       check_refresh_wanted();
849       check_squeue();
850       update_progress_stats();
851       select (undef, undef, undef, 0.1);
852     }
853     elsif (time - $progress_stats_updated >= 30)
854     {
855       update_progress_stats();
856     }
857     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
858         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
859     {
860       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
861           .($thisround_failed+$thisround_succeeded)
862           .") -- giving up on this round";
863       Log (undef, $message);
864       last THISROUND;
865     }
866
867     # move slots from freeslot to holdslot (or back to freeslot) if necessary
868     for (my $i=$#freeslot; $i>=0; $i--) {
869       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
870         push @holdslot, (splice @freeslot, $i, 1);
871       }
872     }
873     for (my $i=$#holdslot; $i>=0; $i--) {
874       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
875         push @freeslot, (splice @holdslot, $i, 1);
876       }
877     }
878
879     # give up if no nodes are succeeding
880     if (!grep { $_->{node}->{losing_streak} == 0 &&
881                     $_->{node}->{hold_count} < 4 } @slot) {
882       my $message = "Every node has failed -- giving up on this round";
883       Log (undef, $message);
884       last THISROUND;
885     }
886   }
887 }
888
889
890 push @freeslot, splice @holdslot;
891 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
892
893
894 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
895 while (%proc)
896 {
897   if ($main::please_continue) {
898     $main::please_continue = 0;
899     goto THISROUND;
900   }
901   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
902   readfrompipes ();
903   if (!reapchildren())
904   {
905     check_refresh_wanted();
906     check_squeue();
907     update_progress_stats();
908     select (undef, undef, undef, 0.1);
909     killem (keys %proc) if $main::please_freeze;
910   }
911 }
912
913 update_progress_stats();
914 freeze_if_want_freeze();
915
916
917 if (!defined $main::success)
918 {
919   if (@jobstep_todo &&
920       $thisround_succeeded == 0 &&
921       ($thisround_failed == 0 || $thisround_failed > 4))
922   {
923     my $message = "stop because $thisround_failed tasks failed and none succeeded";
924     Log (undef, $message);
925     $main::success = 0;
926   }
927   if (!@jobstep_todo)
928   {
929     $main::success = 1;
930   }
931 }
932
933 goto ONELEVEL if !defined $main::success;
934
935
936 release_allocation();
937 freeze();
938 my $collated_output = &create_output_collection();
939
940 if (!$collated_output) {
941   Log (undef, "Failed to write output collection");
942 }
943 else {
944   Log(undef, "job output $collated_output");
945   $Job->update_attributes('output' => $collated_output);
946 }
947
948 Log (undef, "finish");
949
950 save_meta();
951
952 my $final_state;
953 if ($collated_output && $main::success) {
954   $final_state = 'Complete';
955 } else {
956   $final_state = 'Failed';
957 }
958 $Job->update_attributes('state' => $final_state);
959
960 exit (($final_state eq 'Complete') ? 0 : 1);
961
962
963
964 sub update_progress_stats
965 {
966   $progress_stats_updated = time;
967   return if !$progress_is_dirty;
968   my ($todo, $done, $running) = (scalar @jobstep_todo,
969                                  scalar @jobstep_done,
970                                  scalar @slot - scalar @freeslot - scalar @holdslot);
971   $Job->{'tasks_summary'} ||= {};
972   $Job->{'tasks_summary'}->{'todo'} = $todo;
973   $Job->{'tasks_summary'}->{'done'} = $done;
974   $Job->{'tasks_summary'}->{'running'} = $running;
975   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
976   Log (undef, "status: $done done, $running running, $todo todo");
977   $progress_is_dirty = 0;
978 }
979
980
981
982 sub reapchildren
983 {
984   my $pid = waitpid (-1, WNOHANG);
985   return 0 if $pid <= 0;
986
987   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
988                   . "."
989                   . $slot[$proc{$pid}->{slot}]->{cpu});
990   my $jobstepid = $proc{$pid}->{jobstep};
991   my $elapsed = time - $proc{$pid}->{time};
992   my $Jobstep = $jobstep[$jobstepid];
993
994   my $childstatus = $?;
995   my $exitvalue = $childstatus >> 8;
996   my $exitinfo = "exit ".exit_status_s($childstatus);
997   $Jobstep->{'arvados_task'}->reload;
998   my $task_success = $Jobstep->{'arvados_task'}->{success};
999
1000   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1001
1002   if (!defined $task_success) {
1003     # task did not indicate one way or the other --> fail
1004     $Jobstep->{'arvados_task'}->{success} = 0;
1005     $Jobstep->{'arvados_task'}->save;
1006     $task_success = 0;
1007   }
1008
1009   if (!$task_success)
1010   {
1011     my $temporary_fail;
1012     $temporary_fail ||= $Jobstep->{node_fail};
1013     $temporary_fail ||= ($exitvalue == 111);
1014
1015     ++$thisround_failed;
1016     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1017
1018     # Check for signs of a failed or misconfigured node
1019     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1020         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1021       # Don't count this against jobstep failure thresholds if this
1022       # node is already suspected faulty and srun exited quickly
1023       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1024           $elapsed < 5) {
1025         Log ($jobstepid, "blaming failure on suspect node " .
1026              $slot[$proc{$pid}->{slot}]->{node}->{name});
1027         $temporary_fail ||= 1;
1028       }
1029       ban_node_by_slot($proc{$pid}->{slot});
1030     }
1031
1032     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1033                              ++$Jobstep->{'failures'},
1034                              $temporary_fail ? 'temporary ' : 'permanent',
1035                              $elapsed));
1036
1037     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1038       # Give up on this task, and the whole job
1039       $main::success = 0;
1040     }
1041     # Put this task back on the todo queue
1042     push @jobstep_todo, $jobstepid;
1043     $Job->{'tasks_summary'}->{'failed'}++;
1044   }
1045   else
1046   {
1047     ++$thisround_succeeded;
1048     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1049     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1050     push @jobstep_done, $jobstepid;
1051     Log ($jobstepid, "success in $elapsed seconds");
1052   }
1053   $Jobstep->{exitcode} = $childstatus;
1054   $Jobstep->{finishtime} = time;
1055   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1056   $Jobstep->{'arvados_task'}->save;
1057   process_stderr ($jobstepid, $task_success);
1058   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1059                            length($Jobstep->{'arvados_task'}->{output}),
1060                            $Jobstep->{'arvados_task'}->{output}));
1061
1062   close $reader{$jobstepid};
1063   delete $reader{$jobstepid};
1064   delete $slot[$proc{$pid}->{slot}]->{pid};
1065   push @freeslot, $proc{$pid}->{slot};
1066   delete $proc{$pid};
1067
1068   if (defined($Jobstep->{cidfile})) {
1069     unlink $Jobstep->{cidfile};
1070     delete $Jobstep->{cidfile};
1071   }
1072
1073   if ($task_success) {
1074     # Load new tasks
1075     my $newtask_list = [];
1076     my $newtask_results;
1077     do {
1078       $newtask_results = api_call(
1079         "job_tasks/list",
1080         'where' => {
1081           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1082         },
1083         'order' => 'qsequence',
1084         'offset' => scalar(@$newtask_list),
1085       );
1086       push(@$newtask_list, @{$newtask_results->{items}});
1087     } while (@{$newtask_results->{items}});
1088     foreach my $arvados_task (@$newtask_list) {
1089       my $jobstep = {
1090         'level' => $arvados_task->{'sequence'},
1091         'failures' => 0,
1092         'arvados_task' => $arvados_task
1093       };
1094       push @jobstep, $jobstep;
1095       push @jobstep_todo, $#jobstep;
1096     }
1097   }
1098
1099   $progress_is_dirty = 1;
1100   1;
1101 }
1102
1103 sub check_refresh_wanted
1104 {
1105   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1106   if (@stat && $stat[9] > $latest_refresh) {
1107     $latest_refresh = scalar time;
1108     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1109     for my $attr ('cancelled_at',
1110                   'cancelled_by_user_uuid',
1111                   'cancelled_by_client_uuid',
1112                   'state') {
1113       $Job->{$attr} = $Job2->{$attr};
1114     }
1115     if ($Job->{'state'} ne "Running") {
1116       if ($Job->{'state'} eq "Cancelled") {
1117         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1118       } else {
1119         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1120       }
1121       $main::success = 0;
1122       $main::please_freeze = 1;
1123     }
1124   }
1125 }
1126
1127 sub check_squeue
1128 {
1129   # return if the kill list was checked <4 seconds ago
1130   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1131   {
1132     return;
1133   }
1134   $squeue_kill_checked = time;
1135
1136   # use killem() on procs whose killtime is reached
1137   for (keys %proc)
1138   {
1139     if (exists $proc{$_}->{killtime}
1140         && $proc{$_}->{killtime} <= time)
1141     {
1142       killem ($_);
1143     }
1144   }
1145
1146   # return if the squeue was checked <60 seconds ago
1147   if (defined $squeue_checked && $squeue_checked > time - 60)
1148   {
1149     return;
1150   }
1151   $squeue_checked = time;
1152
1153   if (!$have_slurm)
1154   {
1155     # here is an opportunity to check for mysterious problems with local procs
1156     return;
1157   }
1158
1159   # get a list of steps still running
1160   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1161   chop @squeue;
1162   if ($squeue[-1] ne "ok")
1163   {
1164     return;
1165   }
1166   pop @squeue;
1167
1168   # which of my jobsteps are running, according to squeue?
1169   my %ok;
1170   foreach (@squeue)
1171   {
1172     if (/^(\d+)\.(\d+) (\S+)/)
1173     {
1174       if ($1 eq $ENV{SLURM_JOBID})
1175       {
1176         $ok{$3} = 1;
1177       }
1178     }
1179   }
1180
1181   # which of my active child procs (>60s old) were not mentioned by squeue?
1182   foreach (keys %proc)
1183   {
1184     if ($proc{$_}->{time} < time - 60
1185         && !exists $ok{$proc{$_}->{jobstepname}}
1186         && !exists $proc{$_}->{killtime})
1187     {
1188       # kill this proc if it hasn't exited in 30 seconds
1189       $proc{$_}->{killtime} = time + 30;
1190     }
1191   }
1192 }
1193
1194
1195 sub release_allocation
1196 {
1197   if ($have_slurm)
1198   {
1199     Log (undef, "release job allocation");
1200     system "scancel $ENV{SLURM_JOBID}";
1201   }
1202 }
1203
1204
1205 sub readfrompipes
1206 {
1207   my $gotsome = 0;
1208   foreach my $job (keys %reader)
1209   {
1210     my $buf;
1211     while (0 < sysread ($reader{$job}, $buf, 8192))
1212     {
1213       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1214       $jobstep[$job]->{stderr} .= $buf;
1215       preprocess_stderr ($job);
1216       if (length ($jobstep[$job]->{stderr}) > 16384)
1217       {
1218         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1219       }
1220       $gotsome = 1;
1221     }
1222   }
1223   return $gotsome;
1224 }
1225
1226
1227 sub preprocess_stderr
1228 {
1229   my $job = shift;
1230
1231   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1232     my $line = $1;
1233     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1234     Log ($job, "stderr $line");
1235     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1236       # whoa.
1237       $main::please_freeze = 1;
1238     }
1239     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1240       $jobstep[$job]->{node_fail} = 1;
1241       ban_node_by_slot($jobstep[$job]->{slotindex});
1242     }
1243   }
1244 }
1245
1246
1247 sub process_stderr
1248 {
1249   my $job = shift;
1250   my $task_success = shift;
1251   preprocess_stderr ($job);
1252
1253   map {
1254     Log ($job, "stderr $_");
1255   } split ("\n", $jobstep[$job]->{stderr});
1256 }
1257
1258 sub fetch_block
1259 {
1260   my $hash = shift;
1261   my $keep;
1262   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1263     Log(undef, "fetch_block run error from arv-get $hash: $!");
1264     return undef;
1265   }
1266   my $output_block = "";
1267   while (1) {
1268     my $buf;
1269     my $bytes = sysread($keep, $buf, 1024 * 1024);
1270     if (!defined $bytes) {
1271       Log(undef, "fetch_block read error from arv-get: $!");
1272       $output_block = undef;
1273       last;
1274     } elsif ($bytes == 0) {
1275       # sysread returns 0 at the end of the pipe.
1276       last;
1277     } else {
1278       # some bytes were read into buf.
1279       $output_block .= $buf;
1280     }
1281   }
1282   close $keep;
1283   if ($?) {
1284     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1285     $output_block = undef;
1286   }
1287   return $output_block;
1288 }
1289
1290 # Create a collection by concatenating the output of all tasks (each
1291 # task's output is either a manifest fragment, a locator for a
1292 # manifest fragment stored in Keep, or nothing at all). Return the
1293 # portable_data_hash of the new collection.
1294 sub create_output_collection
1295 {
1296   Log (undef, "collate");
1297
1298   my ($child_out, $child_in);
1299   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1300 import arvados
1301 import sys
1302 print (arvados.api("v1").collections().
1303        create(body={"manifest_text": sys.stdin.read()}).
1304        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1305 }, retry_count());
1306
1307   my $task_idx = -1;
1308   my $manifest_size = 0;
1309   for (@jobstep)
1310   {
1311     ++$task_idx;
1312     my $output = $_->{'arvados_task'}->{output};
1313     next if (!defined($output));
1314     my $next_write;
1315     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1316       $next_write = fetch_block($output);
1317     } else {
1318       $next_write = $output;
1319     }
1320     if (defined($next_write)) {
1321       if (!defined(syswrite($child_in, $next_write))) {
1322         # There's been an error writing.  Stop the loop.
1323         # We'll log details about the exit code later.
1324         last;
1325       } else {
1326         $manifest_size += length($next_write);
1327       }
1328     } else {
1329       my $uuid = $_->{'arvados_task'}->{'uuid'};
1330       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1331       $main::success = 0;
1332     }
1333   }
1334   close($child_in);
1335   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1336
1337   my $joboutput;
1338   my $s = IO::Select->new($child_out);
1339   if ($s->can_read(120)) {
1340     sysread($child_out, $joboutput, 1024 * 1024);
1341     waitpid($pid, 0);
1342     if ($?) {
1343       Log(undef, "output collection creation exited " . exit_status_s($?));
1344       $joboutput = undef;
1345     } else {
1346       chomp($joboutput);
1347     }
1348   } else {
1349     Log (undef, "timed out while creating output collection");
1350     foreach my $signal (2, 2, 2, 15, 15, 9) {
1351       kill($signal, $pid);
1352       last if waitpid($pid, WNOHANG) == -1;
1353       sleep(1);
1354     }
1355   }
1356   close($child_out);
1357
1358   return $joboutput;
1359 }
1360
1361
1362 sub killem
1363 {
1364   foreach (@_)
1365   {
1366     my $sig = 2;                # SIGINT first
1367     if (exists $proc{$_}->{"sent_$sig"} &&
1368         time - $proc{$_}->{"sent_$sig"} > 4)
1369     {
1370       $sig = 15;                # SIGTERM if SIGINT doesn't work
1371     }
1372     if (exists $proc{$_}->{"sent_$sig"} &&
1373         time - $proc{$_}->{"sent_$sig"} > 4)
1374     {
1375       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1376     }
1377     if (!exists $proc{$_}->{"sent_$sig"})
1378     {
1379       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1380       kill $sig, $_;
1381       select (undef, undef, undef, 0.1);
1382       if ($sig == 2)
1383       {
1384         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1385       }
1386       $proc{$_}->{"sent_$sig"} = time;
1387       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1388     }
1389   }
1390 }
1391
1392
1393 sub fhbits
1394 {
1395   my($bits);
1396   for (@_) {
1397     vec($bits,fileno($_),1) = 1;
1398   }
1399   $bits;
1400 }
1401
1402
1403 # Send log output to Keep via arv-put.
1404 #
1405 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1406 # $log_pipe_pid is the pid of the arv-put subprocess.
1407 #
1408 # The only functions that should access these variables directly are:
1409 #
1410 # log_writer_start($logfilename)
1411 #     Starts an arv-put pipe, reading data on stdin and writing it to
1412 #     a $logfilename file in an output collection.
1413 #
1414 # log_writer_send($txt)
1415 #     Writes $txt to the output log collection.
1416 #
1417 # log_writer_finish()
1418 #     Closes the arv-put pipe and returns the output that it produces.
1419 #
1420 # log_writer_is_active()
1421 #     Returns a true value if there is currently a live arv-put
1422 #     process, false otherwise.
1423 #
1424 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1425
1426 sub log_writer_start($)
1427 {
1428   my $logfilename = shift;
1429   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1430                         'arv-put',
1431                         '--portable-data-hash',
1432                         '--project-uuid', $Job->{owner_uuid},
1433                         '--retries', '3',
1434                         '--name', $logfilename,
1435                         '--filename', $logfilename,
1436                         '-');
1437 }
1438
1439 sub log_writer_send($)
1440 {
1441   my $txt = shift;
1442   print $log_pipe_in $txt;
1443 }
1444
1445 sub log_writer_finish()
1446 {
1447   return unless $log_pipe_pid;
1448
1449   close($log_pipe_in);
1450   my $arv_put_output;
1451
1452   my $s = IO::Select->new($log_pipe_out);
1453   if ($s->can_read(120)) {
1454     sysread($log_pipe_out, $arv_put_output, 1024);
1455     chomp($arv_put_output);
1456   } else {
1457     Log (undef, "timed out reading from 'arv-put'");
1458   }
1459
1460   waitpid($log_pipe_pid, 0);
1461   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1462   if ($?) {
1463     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1464   }
1465
1466   return $arv_put_output;
1467 }
1468
1469 sub log_writer_is_active() {
1470   return $log_pipe_pid;
1471 }
1472
1473 sub Log                         # ($jobstep_id, $logmessage)
1474 {
1475   if ($_[1] =~ /\n/) {
1476     for my $line (split (/\n/, $_[1])) {
1477       Log ($_[0], $line);
1478     }
1479     return;
1480   }
1481   my $fh = select STDERR; $|=1; select $fh;
1482   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1483   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1484   $message .= "\n";
1485   my $datetime;
1486   if (log_writer_is_active() || -t STDERR) {
1487     my @gmtime = gmtime;
1488     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1489                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1490   }
1491   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1492
1493   if (log_writer_is_active()) {
1494     log_writer_send($datetime . " " . $message);
1495   }
1496 }
1497
1498
1499 sub croak
1500 {
1501   my ($package, $file, $line) = caller;
1502   my $message = "@_ at $file line $line\n";
1503   Log (undef, $message);
1504   freeze() if @jobstep_todo;
1505   create_output_collection() if @jobstep_todo;
1506   cleanup();
1507   save_meta();
1508   die;
1509 }
1510
1511
1512 sub cleanup
1513 {
1514   return unless $Job;
1515   if ($Job->{'state'} eq 'Cancelled') {
1516     $Job->update_attributes('finished_at' => scalar gmtime);
1517   } else {
1518     $Job->update_attributes('state' => 'Failed');
1519   }
1520 }
1521
1522
1523 sub save_meta
1524 {
1525   my $justcheckpoint = shift; # false if this will be the last meta saved
1526   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1527   return unless log_writer_is_active();
1528
1529   my $loglocator = log_writer_finish();
1530   Log (undef, "log manifest is $loglocator");
1531   $Job->{'log'} = $loglocator;
1532   $Job->update_attributes('log', $loglocator);
1533 }
1534
1535
1536 sub freeze_if_want_freeze
1537 {
1538   if ($main::please_freeze)
1539   {
1540     release_allocation();
1541     if (@_)
1542     {
1543       # kill some srun procs before freeze+stop
1544       map { $proc{$_} = {} } @_;
1545       while (%proc)
1546       {
1547         killem (keys %proc);
1548         select (undef, undef, undef, 0.1);
1549         my $died;
1550         while (($died = waitpid (-1, WNOHANG)) > 0)
1551         {
1552           delete $proc{$died};
1553         }
1554       }
1555     }
1556     freeze();
1557     create_output_collection();
1558     cleanup();
1559     save_meta();
1560     exit 1;
1561   }
1562 }
1563
1564
1565 sub freeze
1566 {
1567   Log (undef, "Freeze not implemented");
1568   return;
1569 }
1570
1571
1572 sub thaw
1573 {
1574   croak ("Thaw not implemented");
1575 }
1576
1577
1578 sub freezequote
1579 {
1580   my $s = shift;
1581   $s =~ s/\\/\\\\/g;
1582   $s =~ s/\n/\\n/g;
1583   return $s;
1584 }
1585
1586
1587 sub freezeunquote
1588 {
1589   my $s = shift;
1590   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1591   return $s;
1592 }
1593
1594
1595 sub srun
1596 {
1597   my $srunargs = shift;
1598   my $execargs = shift;
1599   my $opts = shift || {};
1600   my $stdin = shift;
1601   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1602
1603   $Data::Dumper::Terse = 1;
1604   $Data::Dumper::Indent = 0;
1605   my $show_cmd = Dumper($args);
1606   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1607   $show_cmd =~ s/\n/ /g;
1608   warn "starting: $show_cmd\n";
1609
1610   if (defined $stdin) {
1611     my $child = open STDIN, "-|";
1612     defined $child or die "no fork: $!";
1613     if ($child == 0) {
1614       print $stdin or die $!;
1615       close STDOUT or die $!;
1616       exit 0;
1617     }
1618   }
1619
1620   return system (@$args) if $opts->{fork};
1621
1622   exec @$args;
1623   warn "ENV size is ".length(join(" ",%ENV));
1624   die "exec failed: $!: @$args";
1625 }
1626
1627
1628 sub ban_node_by_slot {
1629   # Don't start any new jobsteps on this node for 60 seconds
1630   my $slotid = shift;
1631   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1632   $slot[$slotid]->{node}->{hold_count}++;
1633   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1634 }
1635
1636 sub must_lock_now
1637 {
1638   my ($lockfile, $error_message) = @_;
1639   open L, ">", $lockfile or croak("$lockfile: $!");
1640   if (!flock L, LOCK_EX|LOCK_NB) {
1641     croak("Can't lock $lockfile: $error_message\n");
1642   }
1643 }
1644
1645 sub find_docker_image {
1646   # Given a Keep locator, check to see if it contains a Docker image.
1647   # If so, return its stream name and Docker hash.
1648   # If not, return undef for both values.
1649   my $locator = shift;
1650   my ($streamname, $filename);
1651   my $image = api_call("collections/get", uuid => $locator);
1652   if ($image) {
1653     foreach my $line (split(/\n/, $image->{manifest_text})) {
1654       my @tokens = split(/\s+/, $line);
1655       next if (!@tokens);
1656       $streamname = shift(@tokens);
1657       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1658         if (defined($filename)) {
1659           return (undef, undef);  # More than one file in the Collection.
1660         } else {
1661           $filename = (split(/:/, $filedata, 3))[2];
1662         }
1663       }
1664     }
1665   }
1666   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1667     return ($streamname, $1);
1668   } else {
1669     return (undef, undef);
1670   }
1671 }
1672
1673 sub retry_count {
1674   # Calculate the number of times an operation should be retried,
1675   # assuming exponential backoff, and that we're willing to retry as
1676   # long as tasks have been running.  Enforce a minimum of 3 retries.
1677   my ($starttime, $endtime, $timediff, $retries);
1678   if (@jobstep) {
1679     $starttime = $jobstep[0]->{starttime};
1680     $endtime = $jobstep[-1]->{finishtime};
1681   }
1682   if (!defined($starttime)) {
1683     $timediff = 0;
1684   } elsif (!defined($endtime)) {
1685     $timediff = time - $starttime;
1686   } else {
1687     $timediff = ($endtime - $starttime) - (time - $endtime);
1688   }
1689   if ($timediff > 0) {
1690     $retries = int(log($timediff) / log(2));
1691   } else {
1692     $retries = 1;  # Use the minimum.
1693   }
1694   return ($retries > 3) ? $retries : 3;
1695 }
1696
1697 sub retry_op {
1698   # Pass in two function references.
1699   # This method will be called with the remaining arguments.
1700   # If it dies, retry it with exponential backoff until it succeeds,
1701   # or until the current retry_count is exhausted.  After each failure
1702   # that can be retried, the second function will be called with
1703   # the current try count (0-based), next try time, and error message.
1704   my $operation = shift;
1705   my $retry_callback = shift;
1706   my $retries = retry_count();
1707   foreach my $try_count (0..$retries) {
1708     my $next_try = time + (2 ** $try_count);
1709     my $result = eval { $operation->(@_); };
1710     if (!$@) {
1711       return $result;
1712     } elsif ($try_count < $retries) {
1713       $retry_callback->($try_count, $next_try, $@);
1714       my $sleep_time = $next_try - time;
1715       sleep($sleep_time) if ($sleep_time > 0);
1716     }
1717   }
1718   # Ensure the error message ends in a newline, so Perl doesn't add
1719   # retry_op's line number to it.
1720   chomp($@);
1721   die($@ . "\n");
1722 }
1723
1724 sub api_call {
1725   # Pass in a /-separated API method name, and arguments for it.
1726   # This function will call that method, retrying as needed until
1727   # the current retry_count is exhausted, with a log on the first failure.
1728   my $method_name = shift;
1729   my $log_api_retry = sub {
1730     my ($try_count, $next_try_at, $errmsg) = @_;
1731     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1732     $errmsg =~ s/\s/ /g;
1733     $errmsg =~ s/\s+$//;
1734     my $retry_msg;
1735     if ($next_try_at < time) {
1736       $retry_msg = "Retrying.";
1737     } else {
1738       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1739       $retry_msg = "Retrying at $next_try_fmt.";
1740     }
1741     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1742   };
1743   my $method = $arv;
1744   foreach my $key (split(/\//, $method_name)) {
1745     $method = $method->{$key};
1746   }
1747   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1748 }
1749
1750 sub exit_status_s {
1751   # Given a $?, return a human-readable exit code string like "0" or
1752   # "1" or "0 with signal 1" or "1 with signal 11".
1753   my $exitcode = shift;
1754   my $s = $exitcode >> 8;
1755   if ($exitcode & 0x7f) {
1756     $s .= " with signal " . ($exitcode & 0x7f);
1757   }
1758   if ($exitcode & 0x80) {
1759     $s .= " with core dump";
1760   }
1761   return $s;
1762 }
1763
1764 sub handle_readall {
1765   # Pass in a glob reference to a file handle.
1766   # Read all its contents and return them as a string.
1767   my $fh_glob_ref = shift;
1768   local $/ = undef;
1769   return <$fh_glob_ref>;
1770 }
1771
1772 sub tar_filename_n {
1773   my $n = shift;
1774   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1775 }
1776
1777 sub add_git_archive {
1778   # Pass in a git archive command as a string or list, a la system().
1779   # This method will save its output to be included in the archive sent to the
1780   # build script.
1781   my $git_input;
1782   $git_tar_count++;
1783   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1784     croak("Failed to save git archive: $!");
1785   }
1786   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1787   close($git_input);
1788   waitpid($git_pid, 0);
1789   close(GIT_ARCHIVE);
1790   if ($?) {
1791     croak("Failed to save git archive: git exited " . exit_status_s($?));
1792   }
1793 }
1794
1795 sub combined_git_archive {
1796   # Combine all saved tar archives into a single archive, then return its
1797   # contents in a string.  Return undef if no archives have been saved.
1798   if ($git_tar_count < 1) {
1799     return undef;
1800   }
1801   my $base_tar_name = tar_filename_n(1);
1802   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1803     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1804     if ($tar_exit != 0) {
1805       croak("Error preparing build archive: tar -A exited " .
1806             exit_status_s($tar_exit));
1807     }
1808   }
1809   if (!open(GIT_TAR, "<", $base_tar_name)) {
1810     croak("Could not open build archive: $!");
1811   }
1812   my $tar_contents = handle_readall(\*GIT_TAR);
1813   close(GIT_TAR);
1814   return $tar_contents;
1815 }
1816
1817 __DATA__
1818 #!/usr/bin/perl
1819 #
1820 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1821 # server invokes this script on individual compute nodes, or localhost if we're
1822 # running a job locally.  It gets called in two modes:
1823 #
1824 # * No arguments: Installation mode.  Read a tar archive from the DATA
1825 #   file handle; it includes the Crunch script's source code, and
1826 #   maybe SDKs as well.  Those should be installed in the proper
1827 #   locations.  This runs outside of any Docker container, so don't try to
1828 #   introspect Crunch's runtime environment.
1829 #
1830 # * With arguments: Crunch script run mode.  This script should set up the
1831 #   environment, then run the command specified in the arguments.  This runs
1832 #   inside any Docker container.
1833
1834 use Fcntl ':flock';
1835 use File::Path qw( make_path remove_tree );
1836 use POSIX qw(getcwd);
1837
1838 # Map SDK subdirectories to the path environments they belong to.
1839 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1840
1841 my $destdir = $ENV{"CRUNCH_SRC"};
1842 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1843 my $repo = $ENV{"CRUNCH_SRC_URL"};
1844 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1845 my $job_work = $ENV{"JOB_WORK"};
1846 my $task_work = $ENV{"TASK_WORK"};
1847
1848 for my $dir ($destdir, $job_work, $task_work) {
1849   if ($dir) {
1850     make_path $dir;
1851     -e $dir or die "Failed to create temporary directory ($dir): $!";
1852   }
1853 }
1854
1855 if ($task_work) {
1856   remove_tree($task_work, {keep_root => 1});
1857 }
1858
1859 open(STDOUT_ORIG, ">&", STDOUT);
1860 open(STDERR_ORIG, ">&", STDERR);
1861 open(STDOUT, ">>", "$destdir.log");
1862 open(STDERR, ">&", STDOUT);
1863
1864 ### Crunch script run mode
1865 if (@ARGV) {
1866   # We want to do routine logging during task 0 only.  This gives the user
1867   # the information they need, but avoids repeating the information for every
1868   # task.
1869   my $Log;
1870   if ($ENV{TASK_SEQUENCE} eq "0") {
1871     $Log = sub {
1872       my $msg = shift;
1873       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1874     };
1875   } else {
1876     $Log = sub { };
1877   }
1878
1879   my $python_src = "$install_dir/python";
1880   my $venv_dir = "$job_work/.arvados.venv";
1881   my $venv_built = -e "$venv_dir/bin/activate";
1882   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1883     shell_or_die("virtualenv", "--quiet", "--system-site-packages",
1884                  "--python=python2.7", $venv_dir);
1885     shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1886     $venv_built = 1;
1887     $Log->("Built Python SDK virtualenv");
1888   }
1889
1890   my $pip_bin = "pip";
1891   if ($venv_built) {
1892     $Log->("Running in Python SDK virtualenv");
1893     $pip_bin = "$venv_dir/bin/pip";
1894     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1895     @ARGV = ("/bin/sh", "-ec",
1896              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1897   } elsif (-d $python_src) {
1898     $Log->("Warning: virtualenv not found inside Docker container default " .
1899            "\$PATH. Can't install Python SDK.");
1900   }
1901
1902   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1903   if ($pkgs) {
1904     $Log->("Using Arvados SDK:");
1905     foreach my $line (split /\n/, $pkgs) {
1906       $Log->($line);
1907     }
1908   } else {
1909     $Log->("Arvados SDK packages not found");
1910   }
1911
1912   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1913     my $sdk_path = "$install_dir/$sdk_dir";
1914     if (-d $sdk_path) {
1915       if ($ENV{$sdk_envkey}) {
1916         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1917       } else {
1918         $ENV{$sdk_envkey} = $sdk_path;
1919       }
1920       $Log->("Arvados SDK added to %s", $sdk_envkey);
1921     }
1922   }
1923
1924   close(STDOUT);
1925   close(STDERR);
1926   open(STDOUT, ">&", STDOUT_ORIG);
1927   open(STDERR, ">&", STDERR_ORIG);
1928   exec(@ARGV);
1929   die "Cannot exec `@ARGV`: $!";
1930 }
1931
1932 ### Installation mode
1933 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1934 flock L, LOCK_EX;
1935 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1936   # This version already installed -> nothing to do.
1937   exit(0);
1938 }
1939
1940 unlink "$destdir.commit";
1941 mkdir $destdir;
1942
1943 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1944   die "Error launching 'tar -xC $destdir': $!";
1945 }
1946 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1947 # get SIGPIPE.  We must feed it data incrementally.
1948 my $tar_input;
1949 while (read(DATA, $tar_input, 65536)) {
1950   print TARX $tar_input;
1951 }
1952 if(!close(TARX)) {
1953   die "'tar -xC $destdir' exited $?: $!";
1954 }
1955
1956 mkdir $install_dir;
1957
1958 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1959 if (-d $sdk_root) {
1960   foreach my $sdk_lang (("python",
1961                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1962     if (-d "$sdk_root/$sdk_lang") {
1963       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1964         die "Failed to install $sdk_lang SDK: $!";
1965       }
1966     }
1967   }
1968 }
1969
1970 my $python_dir = "$install_dir/python";
1971 if ((-d $python_dir) and can_run("python2.7") and
1972     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1973   # egg_info failed, probably when it asked git for a build tag.
1974   # Specify no build tag.
1975   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1976   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1977   close($pysdk_cfg);
1978 }
1979
1980 if (-e "$destdir/crunch_scripts/install") {
1981     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1982 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1983     # Old version
1984     shell_or_die ("./tests/autotests.sh", $install_dir);
1985 } elsif (-e "./install.sh") {
1986     shell_or_die ("./install.sh", $install_dir);
1987 }
1988
1989 if ($commit) {
1990     unlink "$destdir.commit.new";
1991     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1992     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1993 }
1994
1995 close L;
1996
1997 sub can_run {
1998   my $command_name = shift;
1999   open(my $which, "-|", "which", $command_name);
2000   while (<$which>) { }
2001   close($which);
2002   return ($? == 0);
2003 }
2004
2005 sub shell_or_die
2006 {
2007   if ($ENV{"DEBUG"}) {
2008     print STDERR "@_\n";
2009   }
2010   if (system (@_) != 0) {
2011     my $err = $!;
2012     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
2013     open STDERR, ">&STDERR_ORIG";
2014     system ("cat $destdir.log >&2");
2015     die "@_ failed ($err): $exitstatus";
2016   }
2017 }
2018
2019 __DATA__