5448: Fix cid file cleanup.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant EX_TEMPFAIL => 75;
100
101 $ENV{"TMPDIR"} ||= "/tmp";
102 unless (defined $ENV{"CRUNCH_TMP"}) {
103   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
104   if ($ENV{"USER"} ne "crunch" && $< != 0) {
105     # use a tmp dir unique for my uid
106     $ENV{"CRUNCH_TMP"} .= "-$<";
107   }
108 }
109
110 # Create the tmp directory if it does not exist
111 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
112   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
113 }
114
115 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
116 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
117 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
118 mkdir ($ENV{"JOB_WORK"});
119
120 my $force_unlock;
121 my $git_dir;
122 my $jobspec;
123 my $job_api_token;
124 my $no_clear_tmp;
125 my $resume_stash;
126 GetOptions('force-unlock' => \$force_unlock,
127            'git-dir=s' => \$git_dir,
128            'job=s' => \$jobspec,
129            'job-api-token=s' => \$job_api_token,
130            'no-clear-tmp' => \$no_clear_tmp,
131            'resume-stash=s' => \$resume_stash,
132     );
133
134 if (defined $job_api_token) {
135   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
136 }
137
138 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
139 my $local_job = 0;
140
141
142 $SIG{'USR1'} = sub
143 {
144   $main::ENV{CRUNCH_DEBUG} = 1;
145 };
146 $SIG{'USR2'} = sub
147 {
148   $main::ENV{CRUNCH_DEBUG} = 0;
149 };
150
151
152
153 my $arv = Arvados->new('apiVersion' => 'v1');
154
155 my $Job;
156 my $job_id;
157 my $dbh;
158 my $sth;
159 my @jobstep;
160
161 my $User = api_call("users/current");
162
163 if ($jobspec =~ /^[-a-z\d]+$/)
164 {
165   # $jobspec is an Arvados UUID, not a JSON job specification
166   $Job = api_call("jobs/get", uuid => $jobspec);
167   if (!$force_unlock) {
168     # Claim this job, and make sure nobody else does
169     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
170     if ($@) {
171       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
172       exit EX_TEMPFAIL;
173     };
174   }
175 }
176 else
177 {
178   $Job = JSON::decode_json($jobspec);
179
180   if (!$resume_stash)
181   {
182     map { croak ("No $_ specified") unless $Job->{$_} }
183     qw(script script_version script_parameters);
184   }
185
186   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
187   $Job->{'started_at'} = gmtime;
188   $Job->{'state'} = 'Running';
189
190   $Job = api_call("jobs/create", job => $Job);
191 }
192 $job_id = $Job->{'uuid'};
193
194 my $keep_logfile = $job_id . '.log.txt';
195 log_writer_start($keep_logfile);
196
197 $Job->{'runtime_constraints'} ||= {};
198 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
199 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
200
201 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
202 if ($? == 0) {
203   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
204   chomp($gem_versions);
205   chop($gem_versions);  # Closing parentheses
206 } else {
207   $gem_versions = "";
208 }
209 Log(undef,
210     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
211
212 Log (undef, "check slurm allocation");
213 my @slot;
214 my @node;
215 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
216 my @sinfo;
217 if (!$have_slurm)
218 {
219   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
220   push @sinfo, "$localcpus localhost";
221 }
222 if (exists $ENV{SLURM_NODELIST})
223 {
224   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
225 }
226 foreach (@sinfo)
227 {
228   my ($ncpus, $slurm_nodelist) = split;
229   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
230
231   my @nodelist;
232   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
233   {
234     my $nodelist = $1;
235     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
236     {
237       my $ranges = $1;
238       foreach (split (",", $ranges))
239       {
240         my ($a, $b);
241         if (/(\d+)-(\d+)/)
242         {
243           $a = $1;
244           $b = $2;
245         }
246         else
247         {
248           $a = $_;
249           $b = $_;
250         }
251         push @nodelist, map {
252           my $n = $nodelist;
253           $n =~ s/\[[-,\d]+\]/$_/;
254           $n;
255         } ($a..$b);
256       }
257     }
258     else
259     {
260       push @nodelist, $nodelist;
261     }
262   }
263   foreach my $nodename (@nodelist)
264   {
265     Log (undef, "node $nodename - $ncpus slots");
266     my $node = { name => $nodename,
267                  ncpus => $ncpus,
268                  losing_streak => 0,
269                  hold_until => 0 };
270     foreach my $cpu (1..$ncpus)
271     {
272       push @slot, { node => $node,
273                     cpu => $cpu };
274     }
275   }
276   push @node, @nodelist;
277 }
278
279
280
281 # Ensure that we get one jobstep running on each allocated node before
282 # we start overloading nodes with concurrent steps
283
284 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
285
286
287 $Job->update_attributes(
288   'tasks_summary' => { 'failed' => 0,
289                        'todo' => 1,
290                        'running' => 0,
291                        'done' => 0 });
292
293 Log (undef, "start");
294 $SIG{'INT'} = sub { $main::please_freeze = 1; };
295 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
296 $SIG{'TERM'} = \&croak;
297 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
298 $SIG{'ALRM'} = sub { $main::please_info = 1; };
299 $SIG{'CONT'} = sub { $main::please_continue = 1; };
300 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
301
302 $main::please_freeze = 0;
303 $main::please_info = 0;
304 $main::please_continue = 0;
305 $main::please_refresh = 0;
306 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
307
308 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
309 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
310 $ENV{"JOB_UUID"} = $job_id;
311
312
313 my @jobstep_todo = ();
314 my @jobstep_done = ();
315 my @jobstep_tomerge = ();
316 my $jobstep_tomerge_level = 0;
317 my $squeue_checked;
318 my $squeue_kill_checked;
319 my $latest_refresh = scalar time;
320
321
322
323 if (defined $Job->{thawedfromkey})
324 {
325   thaw ($Job->{thawedfromkey});
326 }
327 else
328 {
329   my $first_task = api_call("job_tasks/create", job_task => {
330     'job_uuid' => $Job->{'uuid'},
331     'sequence' => 0,
332     'qsequence' => 0,
333     'parameters' => {},
334   });
335   push @jobstep, { 'level' => 0,
336                    'failures' => 0,
337                    'arvados_task' => $first_task,
338                  };
339   push @jobstep_todo, 0;
340 }
341
342
343 if (!$have_slurm)
344 {
345   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
346 }
347
348 my $build_script = handle_readall(\*DATA);
349 my $nodelist = join(",", @node);
350 my $git_tar_count = 0;
351
352 if (!defined $no_clear_tmp) {
353   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
354   Log (undef, "Clean work dirs");
355
356   my $cleanpid = fork();
357   if ($cleanpid == 0)
358   {
359     # Find FUSE mounts that look like Keep mounts (the mount path has the
360     # word "keep") and unmount them.  Then clean up work directories.
361     # TODO: When #5036 is done and widely deployed, we can get rid of the
362     # regular expression and just unmount everything with type fuse.keep.
363     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
364           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
365     exit (1);
366   }
367   while (1)
368   {
369     last if $cleanpid == waitpid (-1, WNOHANG);
370     freeze_if_want_freeze ($cleanpid);
371     select (undef, undef, undef, 0.1);
372   }
373   Log (undef, "Cleanup command exited ".exit_status_s($?));
374 }
375
376 # If this job requires a Docker image, install that.
377 my $docker_bin = "/usr/bin/docker.io";
378 my ($docker_locator, $docker_stream, $docker_hash);
379 if ($docker_locator = $Job->{docker_image_locator}) {
380   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
381   if (!$docker_hash)
382   {
383     croak("No Docker image hash found from locator $docker_locator");
384   }
385   $docker_stream =~ s/^\.//;
386   my $docker_install_script = qq{
387 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
388     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
389 fi
390 };
391   my $docker_pid = fork();
392   if ($docker_pid == 0)
393   {
394     srun (["srun", "--nodelist=" . join(',', @node)],
395           ["/bin/sh", "-ec", $docker_install_script]);
396     exit ($?);
397   }
398   while (1)
399   {
400     last if $docker_pid == waitpid (-1, WNOHANG);
401     freeze_if_want_freeze ($docker_pid);
402     select (undef, undef, undef, 0.1);
403   }
404   if ($? != 0)
405   {
406     croak("Installing Docker image from $docker_locator exited "
407           .exit_status_s($?));
408   }
409
410   if ($Job->{arvados_sdk_version}) {
411     # The job also specifies an Arvados SDK version.  Add the SDKs to the
412     # tar file for the build script to install.
413     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
414                        $Job->{arvados_sdk_version}));
415     add_git_archive("git", "--git-dir=$git_dir", "archive",
416                     "--prefix=.arvados.sdk/",
417                     $Job->{arvados_sdk_version}, "sdk");
418   }
419 }
420
421 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
422   # If script_version looks like an absolute path, *and* the --git-dir
423   # argument was not given -- which implies we were not invoked by
424   # crunch-dispatch -- we will use the given path as a working
425   # directory instead of resolving script_version to a git commit (or
426   # doing anything else with git).
427   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
428   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
429 }
430 else {
431   # Resolve the given script_version to a git commit sha1. Also, if
432   # the repository is remote, clone it into our local filesystem: this
433   # ensures "git archive" will work, and is necessary to reliably
434   # resolve a symbolic script_version like "master^".
435   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
436
437   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
438
439   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
440
441   # If we're running under crunch-dispatch, it will have already
442   # pulled the appropriate source tree into its own repository, and
443   # given us that repo's path as $git_dir.
444   #
445   # If we're running a "local" job, we might have to fetch content
446   # from a remote repository.
447   #
448   # (Currently crunch-dispatch gives a local path with --git-dir, but
449   # we might as well accept URLs there too in case it changes its
450   # mind.)
451   my $repo = $git_dir || $Job->{'repository'};
452
453   # Repository can be remote or local. If remote, we'll need to fetch it
454   # to a local dir before doing `git log` et al.
455   my $repo_location;
456
457   if ($repo =~ m{://|^[^/]*:}) {
458     # $repo is a git url we can clone, like git:// or https:// or
459     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
460     # not recognized here because distinguishing that from a local
461     # path is too fragile. If you really need something strange here,
462     # use the ssh:// form.
463     $repo_location = 'remote';
464   } elsif ($repo =~ m{^\.*/}) {
465     # $repo is a local path to a git index. We'll also resolve ../foo
466     # to ../foo/.git if the latter is a directory. To help
467     # disambiguate local paths from named hosted repositories, this
468     # form must be given as ./ or ../ if it's a relative path.
469     if (-d "$repo/.git") {
470       $repo = "$repo/.git";
471     }
472     $repo_location = 'local';
473   } else {
474     # $repo is none of the above. It must be the name of a hosted
475     # repository.
476     my $arv_repo_list = api_call("repositories/list",
477                                  'filters' => [['name','=',$repo]]);
478     my @repos_found = @{$arv_repo_list->{'items'}};
479     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
480     if ($n_found > 0) {
481       Log(undef, "Repository '$repo' -> "
482           . join(", ", map { $_->{'uuid'} } @repos_found));
483     }
484     if ($n_found != 1) {
485       croak("Error: Found $n_found repositories with name '$repo'.");
486     }
487     $repo = $repos_found[0]->{'fetch_url'};
488     $repo_location = 'remote';
489   }
490   Log(undef, "Using $repo_location repository '$repo'");
491   $ENV{"CRUNCH_SRC_URL"} = $repo;
492
493   # Resolve given script_version (we'll call that $treeish here) to a
494   # commit sha1 ($commit).
495   my $treeish = $Job->{'script_version'};
496   my $commit;
497   if ($repo_location eq 'remote') {
498     # We minimize excess object-fetching by re-using the same bare
499     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
500     # just keep adding remotes to it as needed.
501     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
502     my $gitcmd = "git --git-dir=\Q$local_repo\E";
503
504     # Set up our local repo for caching remote objects, making
505     # archives, etc.
506     if (!-d $local_repo) {
507       make_path($local_repo) or croak("Error: could not create $local_repo");
508     }
509     # This works (exits 0 and doesn't delete fetched objects) even
510     # if $local_repo is already initialized:
511     `$gitcmd init --bare`;
512     if ($?) {
513       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
514     }
515
516     # If $treeish looks like a hash (or abbrev hash) we look it up in
517     # our local cache first, since that's cheaper. (We don't want to
518     # do that with tags/branches though -- those change over time, so
519     # they should always be resolved by the remote repo.)
520     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
521       # Hide stderr because it's normal for this to fail:
522       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
523       if ($? == 0 &&
524           # Careful not to resolve a branch named abcdeff to commit 1234567:
525           $sha1 =~ /^$treeish/ &&
526           $sha1 =~ /^([0-9a-f]{40})$/s) {
527         $commit = $1;
528         Log(undef, "Commit $commit already present in $local_repo");
529       }
530     }
531
532     if (!defined $commit) {
533       # If $treeish isn't just a hash or abbrev hash, or isn't here
534       # yet, we need to fetch the remote to resolve it correctly.
535
536       # First, remove all local heads. This prevents a name that does
537       # not exist on the remote from resolving to (or colliding with)
538       # a previously fetched branch or tag (possibly from a different
539       # remote).
540       remove_tree("$local_repo/refs/heads", {keep_root => 1});
541
542       Log(undef, "Fetching objects from $repo to $local_repo");
543       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
544       if ($?) {
545         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
546       }
547     }
548
549     # Now that the data is all here, we will use our local repo for
550     # the rest of our git activities.
551     $repo = $local_repo;
552   }
553
554   my $gitcmd = "git --git-dir=\Q$repo\E";
555   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
556   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
557     croak("`$gitcmd rev-list` exited "
558           .exit_status_s($?)
559           .", '$treeish' not found. Giving up.");
560   }
561   $commit = $1;
562   Log(undef, "Version $treeish is commit $commit");
563
564   if ($commit ne $Job->{'script_version'}) {
565     # Record the real commit id in the database, frozentokey, logs,
566     # etc. -- instead of an abbreviation or a branch name which can
567     # become ambiguous or point to a different commit in the future.
568     if (!$Job->update_attributes('script_version' => $commit)) {
569       croak("Error: failed to update job's script_version attribute");
570     }
571   }
572
573   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
574   add_git_archive("$gitcmd archive ''\Q$commit\E");
575 }
576
577 my $git_archive = combined_git_archive();
578 if (!defined $git_archive) {
579   Log(undef, "Skip install phase (no git archive)");
580   if ($have_slurm) {
581     Log(undef, "Warning: This probably means workers have no source tree!");
582   }
583 }
584 else {
585   Log(undef, "Run install script on all workers");
586
587   my @srunargs = ("srun",
588                   "--nodelist=$nodelist",
589                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
590   my @execargs = ("sh", "-c",
591                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
592
593   my $installpid = fork();
594   if ($installpid == 0)
595   {
596     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
597     exit (1);
598   }
599   while (1)
600   {
601     last if $installpid == waitpid (-1, WNOHANG);
602     freeze_if_want_freeze ($installpid);
603     select (undef, undef, undef, 0.1);
604   }
605   my $install_exited = $?;
606   Log (undef, "Install script exited ".exit_status_s($install_exited));
607   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
608     unlink($tar_filename);
609   }
610   exit (1) if $install_exited != 0;
611 }
612
613 foreach (qw (script script_version script_parameters runtime_constraints))
614 {
615   Log (undef,
616        "$_ " .
617        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
618 }
619 foreach (split (/\n/, $Job->{knobs}))
620 {
621   Log (undef, "knob " . $_);
622 }
623
624
625
626 $main::success = undef;
627
628
629
630 ONELEVEL:
631
632 my $thisround_succeeded = 0;
633 my $thisround_failed = 0;
634 my $thisround_failed_multiple = 0;
635
636 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
637                        or $a <=> $b } @jobstep_todo;
638 my $level = $jobstep[$jobstep_todo[0]]->{level};
639 Log (undef, "start level $level");
640
641
642
643 my %proc;
644 my @freeslot = (0..$#slot);
645 my @holdslot;
646 my %reader;
647 my $progress_is_dirty = 1;
648 my $progress_stats_updated = 0;
649
650 update_progress_stats();
651
652
653
654 THISROUND:
655 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
656 {
657   my $id = $jobstep_todo[$todo_ptr];
658   my $Jobstep = $jobstep[$id];
659   if ($Jobstep->{level} != $level)
660   {
661     next;
662   }
663
664   pipe $reader{$id}, "writer" or croak ($!);
665   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
666   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
667
668   my $childslot = $freeslot[0];
669   my $childnode = $slot[$childslot]->{node};
670   my $childslotname = join (".",
671                             $slot[$childslot]->{node}->{name},
672                             $slot[$childslot]->{cpu});
673
674   if ($docker_hash) {
675       $Jobstep->{cidfile} = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
676   }
677
678   my $childpid = fork();
679   if ($childpid == 0)
680   {
681     $SIG{'INT'} = 'DEFAULT';
682     $SIG{'QUIT'} = 'DEFAULT';
683     $SIG{'TERM'} = 'DEFAULT';
684
685     foreach (values (%reader))
686     {
687       close($_);
688     }
689     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
690     open(STDOUT,">&writer");
691     open(STDERR,">&writer");
692
693     undef $dbh;
694     undef $sth;
695
696     delete $ENV{"GNUPGHOME"};
697     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
698     $ENV{"TASK_QSEQUENCE"} = $id;
699     $ENV{"TASK_SEQUENCE"} = $level;
700     $ENV{"JOB_SCRIPT"} = $Job->{script};
701     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
702       $param =~ tr/a-z/A-Z/;
703       $ENV{"JOB_PARAMETER_$param"} = $value;
704     }
705     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
706     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
707     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
708     $ENV{"HOME"} = $ENV{"TASK_WORK"};
709     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
710     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
711     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
712     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
713
714     $ENV{"GZIP"} = "-n";
715
716     my @srunargs = (
717       "srun",
718       "--nodelist=".$childnode->{name},
719       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
720       "--job-name=$job_id.$id.$$",
721         );
722     my $command =
723         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
724         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
725         ."&& cd $ENV{CRUNCH_TMP} ";
726     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
727     if ($docker_hash)
728     {
729       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$Jobstep->{cidfile} -poll=10000 ";
730       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$Jobstep->{cidfile} --sig-proxy ";
731
732       # Dynamically configure the container to use the host system as its
733       # DNS server.  Get the host's global addresses from the ip command,
734       # and turn them into docker --dns options using gawk.
735       $command .=
736           q{$(ip -o address show scope global |
737               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
738
739       # The source tree and $destdir directory (which we have
740       # installed on the worker host) are available in the container,
741       # under the same path.
742       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
743       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
744
745       # Currently, we make arv-mount's mount point appear at /keep
746       # inside the container (instead of using the same path as the
747       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
748       # crunch scripts and utilities must not rely on this. They must
749       # use $TASK_KEEPMOUNT.
750       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
751       $ENV{TASK_KEEPMOUNT} = "/keep";
752
753       # TASK_WORK is almost exactly like a docker data volume: it
754       # starts out empty, is writable, and persists until no
755       # containers use it any more. We don't use --volumes-from to
756       # share it with other containers: it is only accessible to this
757       # task, and it goes away when this task stops.
758       #
759       # However, a docker data volume is writable only by root unless
760       # the mount point already happens to exist in the container with
761       # different permissions. Therefore, we [1] assume /tmp already
762       # exists in the image and is writable by the crunch user; [2]
763       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
764       # writable if they are created by docker while setting up the
765       # other --volumes); and [3] create $TASK_WORK inside the
766       # container using $build_script.
767       $command .= "--volume=/tmp ";
768       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
769       $ENV{"HOME"} = $ENV{"TASK_WORK"};
770       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
771
772       # TODO: Share a single JOB_WORK volume across all task
773       # containers on a given worker node, and delete it when the job
774       # ends (and, in case that doesn't work, when the next job
775       # starts).
776       #
777       # For now, use the same approach as TASK_WORK above.
778       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
779
780       while (my ($env_key, $env_val) = each %ENV)
781       {
782         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
783           $command .= "--env=\Q$env_key=$env_val\E ";
784         }
785       }
786       $command .= "--env=\QHOME=$ENV{HOME}\E ";
787       $command .= "\Q$docker_hash\E ";
788       $command .= "stdbuf --output=0 --error=0 ";
789       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
790     } else {
791       # Non-docker run
792       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
793       $command .= "stdbuf --output=0 --error=0 ";
794       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
795     }
796
797     my @execargs = ('bash', '-c', $command);
798     srun (\@srunargs, \@execargs, undef, $build_script);
799     # exec() failed, we assume nothing happened.
800     die "srun() failed on build script\n";
801   }
802   close("writer");
803   if (!defined $childpid)
804   {
805     close $reader{$id};
806     delete $reader{$id};
807     next;
808   }
809   shift @freeslot;
810   $proc{$childpid} = { jobstep => $id,
811                        time => time,
812                        slot => $childslot,
813                        jobstepname => "$job_id.$id.$childpid",
814                      };
815   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
816   $slot[$childslot]->{pid} = $childpid;
817
818   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
819   Log ($id, "child $childpid started on $childslotname");
820   $Jobstep->{starttime} = time;
821   $Jobstep->{node} = $childnode->{name};
822   $Jobstep->{slotindex} = $childslot;
823   delete $Jobstep->{stderr};
824   delete $Jobstep->{finishtime};
825
826   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
827   $Jobstep->{'arvados_task'}->save;
828
829   splice @jobstep_todo, $todo_ptr, 1;
830   --$todo_ptr;
831
832   $progress_is_dirty = 1;
833
834   while (!@freeslot
835          ||
836          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
837   {
838     last THISROUND if $main::please_freeze || defined($main::success);
839     if ($main::please_info)
840     {
841       $main::please_info = 0;
842       freeze();
843       create_output_collection();
844       save_meta(1);
845       update_progress_stats();
846     }
847     my $gotsome
848         = readfrompipes ()
849         + reapchildren ();
850     if (!$gotsome)
851     {
852       check_refresh_wanted();
853       check_squeue();
854       update_progress_stats();
855       select (undef, undef, undef, 0.1);
856     }
857     elsif (time - $progress_stats_updated >= 30)
858     {
859       update_progress_stats();
860     }
861     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
862         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
863     {
864       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
865           .($thisround_failed+$thisround_succeeded)
866           .") -- giving up on this round";
867       Log (undef, $message);
868       last THISROUND;
869     }
870
871     # move slots from freeslot to holdslot (or back to freeslot) if necessary
872     for (my $i=$#freeslot; $i>=0; $i--) {
873       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
874         push @holdslot, (splice @freeslot, $i, 1);
875       }
876     }
877     for (my $i=$#holdslot; $i>=0; $i--) {
878       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
879         push @freeslot, (splice @holdslot, $i, 1);
880       }
881     }
882
883     # give up if no nodes are succeeding
884     if (!grep { $_->{node}->{losing_streak} == 0 &&
885                     $_->{node}->{hold_count} < 4 } @slot) {
886       my $message = "Every node has failed -- giving up on this round";
887       Log (undef, $message);
888       last THISROUND;
889     }
890   }
891 }
892
893
894 push @freeslot, splice @holdslot;
895 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
896
897
898 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
899 while (%proc)
900 {
901   if ($main::please_continue) {
902     $main::please_continue = 0;
903     goto THISROUND;
904   }
905   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
906   readfrompipes ();
907   if (!reapchildren())
908   {
909     check_refresh_wanted();
910     check_squeue();
911     update_progress_stats();
912     select (undef, undef, undef, 0.1);
913     killem (keys %proc) if $main::please_freeze;
914   }
915 }
916
917 update_progress_stats();
918 freeze_if_want_freeze();
919
920
921 if (!defined $main::success)
922 {
923   if (@jobstep_todo &&
924       $thisround_succeeded == 0 &&
925       ($thisround_failed == 0 || $thisround_failed > 4))
926   {
927     my $message = "stop because $thisround_failed tasks failed and none succeeded";
928     Log (undef, $message);
929     $main::success = 0;
930   }
931   if (!@jobstep_todo)
932   {
933     $main::success = 1;
934   }
935 }
936
937 goto ONELEVEL if !defined $main::success;
938
939
940 release_allocation();
941 freeze();
942 my $collated_output = &create_output_collection();
943
944 if (!$collated_output) {
945   Log (undef, "Failed to write output collection");
946 }
947 else {
948   Log(undef, "job output $collated_output");
949   $Job->update_attributes('output' => $collated_output);
950 }
951
952 Log (undef, "finish");
953
954 save_meta();
955
956 my $final_state;
957 if ($collated_output && $main::success) {
958   $final_state = 'Complete';
959 } else {
960   $final_state = 'Failed';
961 }
962 $Job->update_attributes('state' => $final_state);
963
964 exit (($final_state eq 'Complete') ? 0 : 1);
965
966
967
968 sub update_progress_stats
969 {
970   $progress_stats_updated = time;
971   return if !$progress_is_dirty;
972   my ($todo, $done, $running) = (scalar @jobstep_todo,
973                                  scalar @jobstep_done,
974                                  scalar @slot - scalar @freeslot - scalar @holdslot);
975   $Job->{'tasks_summary'} ||= {};
976   $Job->{'tasks_summary'}->{'todo'} = $todo;
977   $Job->{'tasks_summary'}->{'done'} = $done;
978   $Job->{'tasks_summary'}->{'running'} = $running;
979   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
980   Log (undef, "status: $done done, $running running, $todo todo");
981   $progress_is_dirty = 0;
982 }
983
984
985
986 sub reapchildren
987 {
988   my $pid = waitpid (-1, WNOHANG);
989   return 0 if $pid <= 0;
990
991   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
992                   . "."
993                   . $slot[$proc{$pid}->{slot}]->{cpu});
994   my $jobstepid = $proc{$pid}->{jobstep};
995   my $elapsed = time - $proc{$pid}->{time};
996   my $Jobstep = $jobstep[$jobstepid];
997
998   my $childstatus = $?;
999   my $exitvalue = $childstatus >> 8;
1000   my $exitinfo = "exit ".exit_status_s($childstatus);
1001   $Jobstep->{'arvados_task'}->reload;
1002   my $task_success = $Jobstep->{'arvados_task'}->{success};
1003
1004   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1005
1006   if (!defined $task_success) {
1007     # task did not indicate one way or the other --> fail
1008     $Jobstep->{'arvados_task'}->{success} = 0;
1009     $Jobstep->{'arvados_task'}->save;
1010     $task_success = 0;
1011   }
1012
1013   if (!$task_success)
1014   {
1015     my $temporary_fail;
1016     $temporary_fail ||= $Jobstep->{node_fail};
1017     $temporary_fail ||= ($exitvalue == 111);
1018
1019     ++$thisround_failed;
1020     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1021
1022     # Check for signs of a failed or misconfigured node
1023     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1024         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1025       # Don't count this against jobstep failure thresholds if this
1026       # node is already suspected faulty and srun exited quickly
1027       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1028           $elapsed < 5) {
1029         Log ($jobstepid, "blaming failure on suspect node " .
1030              $slot[$proc{$pid}->{slot}]->{node}->{name});
1031         $temporary_fail ||= 1;
1032       }
1033       ban_node_by_slot($proc{$pid}->{slot});
1034     }
1035
1036     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1037                              ++$Jobstep->{'failures'},
1038                              $temporary_fail ? 'temporary ' : 'permanent',
1039                              $elapsed));
1040
1041     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1042       # Give up on this task, and the whole job
1043       $main::success = 0;
1044     }
1045     # Put this task back on the todo queue
1046     push @jobstep_todo, $jobstepid;
1047     $Job->{'tasks_summary'}->{'failed'}++;
1048   }
1049   else
1050   {
1051     ++$thisround_succeeded;
1052     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1053     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1054     push @jobstep_done, $jobstepid;
1055     Log ($jobstepid, "success in $elapsed seconds");
1056   }
1057   $Jobstep->{exitcode} = $childstatus;
1058   $Jobstep->{finishtime} = time;
1059   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1060   $Jobstep->{'arvados_task'}->save;
1061   process_stderr ($jobstepid, $task_success);
1062   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1063                            length($Jobstep->{'arvados_task'}->{output}),
1064                            $Jobstep->{'arvados_task'}->{output}));
1065
1066   close $reader{$jobstepid};
1067   delete $reader{$jobstepid};
1068   delete $slot[$proc{$pid}->{slot}]->{pid};
1069   push @freeslot, $proc{$pid}->{slot};
1070   delete $proc{$pid};
1071
1072   if (defined($Jobstep->{cidfile})) {
1073     unlink $Jobstep->{cidfile};
1074     delete $Jobstep->{cidfile};
1075   }
1076
1077   if ($task_success) {
1078     # Load new tasks
1079     my $newtask_list = [];
1080     my $newtask_results;
1081     do {
1082       $newtask_results = api_call(
1083         "job_tasks/list",
1084         'where' => {
1085           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1086         },
1087         'order' => 'qsequence',
1088         'offset' => scalar(@$newtask_list),
1089       );
1090       push(@$newtask_list, @{$newtask_results->{items}});
1091     } while (@{$newtask_results->{items}});
1092     foreach my $arvados_task (@$newtask_list) {
1093       my $jobstep = {
1094         'level' => $arvados_task->{'sequence'},
1095         'failures' => 0,
1096         'arvados_task' => $arvados_task
1097       };
1098       push @jobstep, $jobstep;
1099       push @jobstep_todo, $#jobstep;
1100     }
1101   }
1102
1103   $progress_is_dirty = 1;
1104   1;
1105 }
1106
1107 sub check_refresh_wanted
1108 {
1109   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1110   if (@stat && $stat[9] > $latest_refresh) {
1111     $latest_refresh = scalar time;
1112     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1113     for my $attr ('cancelled_at',
1114                   'cancelled_by_user_uuid',
1115                   'cancelled_by_client_uuid',
1116                   'state') {
1117       $Job->{$attr} = $Job2->{$attr};
1118     }
1119     if ($Job->{'state'} ne "Running") {
1120       if ($Job->{'state'} eq "Cancelled") {
1121         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1122       } else {
1123         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1124       }
1125       $main::success = 0;
1126       $main::please_freeze = 1;
1127     }
1128   }
1129 }
1130
1131 sub check_squeue
1132 {
1133   # return if the kill list was checked <4 seconds ago
1134   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1135   {
1136     return;
1137   }
1138   $squeue_kill_checked = time;
1139
1140   # use killem() on procs whose killtime is reached
1141   for (keys %proc)
1142   {
1143     if (exists $proc{$_}->{killtime}
1144         && $proc{$_}->{killtime} <= time)
1145     {
1146       killem ($_);
1147     }
1148   }
1149
1150   # return if the squeue was checked <60 seconds ago
1151   if (defined $squeue_checked && $squeue_checked > time - 60)
1152   {
1153     return;
1154   }
1155   $squeue_checked = time;
1156
1157   if (!$have_slurm)
1158   {
1159     # here is an opportunity to check for mysterious problems with local procs
1160     return;
1161   }
1162
1163   # get a list of steps still running
1164   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1165   chop @squeue;
1166   if ($squeue[-1] ne "ok")
1167   {
1168     return;
1169   }
1170   pop @squeue;
1171
1172   # which of my jobsteps are running, according to squeue?
1173   my %ok;
1174   foreach (@squeue)
1175   {
1176     if (/^(\d+)\.(\d+) (\S+)/)
1177     {
1178       if ($1 eq $ENV{SLURM_JOBID})
1179       {
1180         $ok{$3} = 1;
1181       }
1182     }
1183   }
1184
1185   # which of my active child procs (>60s old) were not mentioned by squeue?
1186   foreach (keys %proc)
1187   {
1188     if ($proc{$_}->{time} < time - 60
1189         && !exists $ok{$proc{$_}->{jobstepname}}
1190         && !exists $proc{$_}->{killtime})
1191     {
1192       # kill this proc if it hasn't exited in 30 seconds
1193       $proc{$_}->{killtime} = time + 30;
1194     }
1195   }
1196 }
1197
1198
1199 sub release_allocation
1200 {
1201   if ($have_slurm)
1202   {
1203     Log (undef, "release job allocation");
1204     system "scancel $ENV{SLURM_JOBID}";
1205   }
1206 }
1207
1208
1209 sub readfrompipes
1210 {
1211   my $gotsome = 0;
1212   foreach my $job (keys %reader)
1213   {
1214     my $buf;
1215     while (0 < sysread ($reader{$job}, $buf, 8192))
1216     {
1217       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1218       $jobstep[$job]->{stderr} .= $buf;
1219       preprocess_stderr ($job);
1220       if (length ($jobstep[$job]->{stderr}) > 16384)
1221       {
1222         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1223       }
1224       $gotsome = 1;
1225     }
1226   }
1227   return $gotsome;
1228 }
1229
1230
1231 sub preprocess_stderr
1232 {
1233   my $job = shift;
1234
1235   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1236     my $line = $1;
1237     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1238     Log ($job, "stderr $line");
1239     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1240       # whoa.
1241       $main::please_freeze = 1;
1242     }
1243     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1244       $jobstep[$job]->{node_fail} = 1;
1245       ban_node_by_slot($jobstep[$job]->{slotindex});
1246     }
1247   }
1248 }
1249
1250
1251 sub process_stderr
1252 {
1253   my $job = shift;
1254   my $task_success = shift;
1255   preprocess_stderr ($job);
1256
1257   map {
1258     Log ($job, "stderr $_");
1259   } split ("\n", $jobstep[$job]->{stderr});
1260 }
1261
1262 sub fetch_block
1263 {
1264   my $hash = shift;
1265   my $keep;
1266   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1267     Log(undef, "fetch_block run error from arv-get $hash: $!");
1268     return undef;
1269   }
1270   my $output_block = "";
1271   while (1) {
1272     my $buf;
1273     my $bytes = sysread($keep, $buf, 1024 * 1024);
1274     if (!defined $bytes) {
1275       Log(undef, "fetch_block read error from arv-get: $!");
1276       $output_block = undef;
1277       last;
1278     } elsif ($bytes == 0) {
1279       # sysread returns 0 at the end of the pipe.
1280       last;
1281     } else {
1282       # some bytes were read into buf.
1283       $output_block .= $buf;
1284     }
1285   }
1286   close $keep;
1287   if ($?) {
1288     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1289     $output_block = undef;
1290   }
1291   return $output_block;
1292 }
1293
1294 # Create a collection by concatenating the output of all tasks (each
1295 # task's output is either a manifest fragment, a locator for a
1296 # manifest fragment stored in Keep, or nothing at all). Return the
1297 # portable_data_hash of the new collection.
1298 sub create_output_collection
1299 {
1300   Log (undef, "collate");
1301
1302   my ($child_out, $child_in);
1303   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1304 import arvados
1305 import sys
1306 print (arvados.api("v1").collections().
1307        create(body={"manifest_text": sys.stdin.read()}).
1308        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1309 }, retry_count());
1310
1311   my $task_idx = -1;
1312   my $manifest_size = 0;
1313   for (@jobstep)
1314   {
1315     ++$task_idx;
1316     my $output = $_->{'arvados_task'}->{output};
1317     next if (!defined($output));
1318     my $next_write;
1319     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1320       $next_write = fetch_block($output);
1321     } else {
1322       $next_write = $output;
1323     }
1324     if (defined($next_write)) {
1325       if (!defined(syswrite($child_in, $next_write))) {
1326         # There's been an error writing.  Stop the loop.
1327         # We'll log details about the exit code later.
1328         last;
1329       } else {
1330         $manifest_size += length($next_write);
1331       }
1332     } else {
1333       my $uuid = $_->{'arvados_task'}->{'uuid'};
1334       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1335       $main::success = 0;
1336     }
1337   }
1338   close($child_in);
1339   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1340
1341   my $joboutput;
1342   my $s = IO::Select->new($child_out);
1343   if ($s->can_read(120)) {
1344     sysread($child_out, $joboutput, 1024 * 1024);
1345     waitpid($pid, 0);
1346     if ($?) {
1347       Log(undef, "output collection creation exited " . exit_status_s($?));
1348       $joboutput = undef;
1349     } else {
1350       chomp($joboutput);
1351     }
1352   } else {
1353     Log (undef, "timed out while creating output collection");
1354     foreach my $signal (2, 2, 2, 15, 15, 9) {
1355       kill($signal, $pid);
1356       last if waitpid($pid, WNOHANG) == -1;
1357       sleep(1);
1358     }
1359   }
1360   close($child_out);
1361
1362   return $joboutput;
1363 }
1364
1365
1366 sub killem
1367 {
1368   foreach (@_)
1369   {
1370     my $sig = 2;                # SIGINT first
1371     if (exists $proc{$_}->{"sent_$sig"} &&
1372         time - $proc{$_}->{"sent_$sig"} > 4)
1373     {
1374       $sig = 15;                # SIGTERM if SIGINT doesn't work
1375     }
1376     if (exists $proc{$_}->{"sent_$sig"} &&
1377         time - $proc{$_}->{"sent_$sig"} > 4)
1378     {
1379       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1380     }
1381     if (!exists $proc{$_}->{"sent_$sig"})
1382     {
1383       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1384       kill $sig, $_;
1385       select (undef, undef, undef, 0.1);
1386       if ($sig == 2)
1387       {
1388         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1389       }
1390       $proc{$_}->{"sent_$sig"} = time;
1391       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1392     }
1393   }
1394 }
1395
1396
1397 sub fhbits
1398 {
1399   my($bits);
1400   for (@_) {
1401     vec($bits,fileno($_),1) = 1;
1402   }
1403   $bits;
1404 }
1405
1406
1407 # Send log output to Keep via arv-put.
1408 #
1409 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1410 # $log_pipe_pid is the pid of the arv-put subprocess.
1411 #
1412 # The only functions that should access these variables directly are:
1413 #
1414 # log_writer_start($logfilename)
1415 #     Starts an arv-put pipe, reading data on stdin and writing it to
1416 #     a $logfilename file in an output collection.
1417 #
1418 # log_writer_send($txt)
1419 #     Writes $txt to the output log collection.
1420 #
1421 # log_writer_finish()
1422 #     Closes the arv-put pipe and returns the output that it produces.
1423 #
1424 # log_writer_is_active()
1425 #     Returns a true value if there is currently a live arv-put
1426 #     process, false otherwise.
1427 #
1428 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1429
1430 sub log_writer_start($)
1431 {
1432   my $logfilename = shift;
1433   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1434                         'arv-put',
1435                         '--portable-data-hash',
1436                         '--project-uuid', $Job->{owner_uuid},
1437                         '--retries', '3',
1438                         '--name', $logfilename,
1439                         '--filename', $logfilename,
1440                         '-');
1441 }
1442
1443 sub log_writer_send($)
1444 {
1445   my $txt = shift;
1446   print $log_pipe_in $txt;
1447 }
1448
1449 sub log_writer_finish()
1450 {
1451   return unless $log_pipe_pid;
1452
1453   close($log_pipe_in);
1454   my $arv_put_output;
1455
1456   my $s = IO::Select->new($log_pipe_out);
1457   if ($s->can_read(120)) {
1458     sysread($log_pipe_out, $arv_put_output, 1024);
1459     chomp($arv_put_output);
1460   } else {
1461     Log (undef, "timed out reading from 'arv-put'");
1462   }
1463
1464   waitpid($log_pipe_pid, 0);
1465   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1466   if ($?) {
1467     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1468   }
1469
1470   return $arv_put_output;
1471 }
1472
1473 sub log_writer_is_active() {
1474   return $log_pipe_pid;
1475 }
1476
1477 sub Log                         # ($jobstep_id, $logmessage)
1478 {
1479   if ($_[1] =~ /\n/) {
1480     for my $line (split (/\n/, $_[1])) {
1481       Log ($_[0], $line);
1482     }
1483     return;
1484   }
1485   my $fh = select STDERR; $|=1; select $fh;
1486   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1487   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1488   $message .= "\n";
1489   my $datetime;
1490   if (log_writer_is_active() || -t STDERR) {
1491     my @gmtime = gmtime;
1492     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1493                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1494   }
1495   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1496
1497   if (log_writer_is_active()) {
1498     log_writer_send($datetime . " " . $message);
1499   }
1500 }
1501
1502
1503 sub croak
1504 {
1505   my ($package, $file, $line) = caller;
1506   my $message = "@_ at $file line $line\n";
1507   Log (undef, $message);
1508   freeze() if @jobstep_todo;
1509   create_output_collection() if @jobstep_todo;
1510   cleanup();
1511   save_meta();
1512   die;
1513 }
1514
1515
1516 sub cleanup
1517 {
1518   return unless $Job;
1519   if ($Job->{'state'} eq 'Cancelled') {
1520     $Job->update_attributes('finished_at' => scalar gmtime);
1521   } else {
1522     $Job->update_attributes('state' => 'Failed');
1523   }
1524 }
1525
1526
1527 sub save_meta
1528 {
1529   my $justcheckpoint = shift; # false if this will be the last meta saved
1530   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1531   return unless log_writer_is_active();
1532
1533   my $loglocator = log_writer_finish();
1534   Log (undef, "log manifest is $loglocator");
1535   $Job->{'log'} = $loglocator;
1536   $Job->update_attributes('log', $loglocator);
1537 }
1538
1539
1540 sub freeze_if_want_freeze
1541 {
1542   if ($main::please_freeze)
1543   {
1544     release_allocation();
1545     if (@_)
1546     {
1547       # kill some srun procs before freeze+stop
1548       map { $proc{$_} = {} } @_;
1549       while (%proc)
1550       {
1551         killem (keys %proc);
1552         select (undef, undef, undef, 0.1);
1553         my $died;
1554         while (($died = waitpid (-1, WNOHANG)) > 0)
1555         {
1556           delete $proc{$died};
1557         }
1558       }
1559     }
1560     freeze();
1561     create_output_collection();
1562     cleanup();
1563     save_meta();
1564     exit 1;
1565   }
1566 }
1567
1568
1569 sub freeze
1570 {
1571   Log (undef, "Freeze not implemented");
1572   return;
1573 }
1574
1575
1576 sub thaw
1577 {
1578   croak ("Thaw not implemented");
1579 }
1580
1581
1582 sub freezequote
1583 {
1584   my $s = shift;
1585   $s =~ s/\\/\\\\/g;
1586   $s =~ s/\n/\\n/g;
1587   return $s;
1588 }
1589
1590
1591 sub freezeunquote
1592 {
1593   my $s = shift;
1594   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1595   return $s;
1596 }
1597
1598
1599 sub srun
1600 {
1601   my $srunargs = shift;
1602   my $execargs = shift;
1603   my $opts = shift || {};
1604   my $stdin = shift;
1605   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1606
1607   $Data::Dumper::Terse = 1;
1608   $Data::Dumper::Indent = 0;
1609   my $show_cmd = Dumper($args);
1610   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1611   $show_cmd =~ s/\n/ /g;
1612   warn "starting: $show_cmd\n";
1613
1614   if (defined $stdin) {
1615     my $child = open STDIN, "-|";
1616     defined $child or die "no fork: $!";
1617     if ($child == 0) {
1618       print $stdin or die $!;
1619       close STDOUT or die $!;
1620       exit 0;
1621     }
1622   }
1623
1624   return system (@$args) if $opts->{fork};
1625
1626   exec @$args;
1627   warn "ENV size is ".length(join(" ",%ENV));
1628   die "exec failed: $!: @$args";
1629 }
1630
1631
1632 sub ban_node_by_slot {
1633   # Don't start any new jobsteps on this node for 60 seconds
1634   my $slotid = shift;
1635   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1636   $slot[$slotid]->{node}->{hold_count}++;
1637   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1638 }
1639
1640 sub must_lock_now
1641 {
1642   my ($lockfile, $error_message) = @_;
1643   open L, ">", $lockfile or croak("$lockfile: $!");
1644   if (!flock L, LOCK_EX|LOCK_NB) {
1645     croak("Can't lock $lockfile: $error_message\n");
1646   }
1647 }
1648
1649 sub find_docker_image {
1650   # Given a Keep locator, check to see if it contains a Docker image.
1651   # If so, return its stream name and Docker hash.
1652   # If not, return undef for both values.
1653   my $locator = shift;
1654   my ($streamname, $filename);
1655   my $image = api_call("collections/get", uuid => $locator);
1656   if ($image) {
1657     foreach my $line (split(/\n/, $image->{manifest_text})) {
1658       my @tokens = split(/\s+/, $line);
1659       next if (!@tokens);
1660       $streamname = shift(@tokens);
1661       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1662         if (defined($filename)) {
1663           return (undef, undef);  # More than one file in the Collection.
1664         } else {
1665           $filename = (split(/:/, $filedata, 3))[2];
1666         }
1667       }
1668     }
1669   }
1670   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1671     return ($streamname, $1);
1672   } else {
1673     return (undef, undef);
1674   }
1675 }
1676
1677 sub retry_count {
1678   # Calculate the number of times an operation should be retried,
1679   # assuming exponential backoff, and that we're willing to retry as
1680   # long as tasks have been running.  Enforce a minimum of 3 retries.
1681   my ($starttime, $endtime, $timediff, $retries);
1682   if (@jobstep) {
1683     $starttime = $jobstep[0]->{starttime};
1684     $endtime = $jobstep[-1]->{finishtime};
1685   }
1686   if (!defined($starttime)) {
1687     $timediff = 0;
1688   } elsif (!defined($endtime)) {
1689     $timediff = time - $starttime;
1690   } else {
1691     $timediff = ($endtime - $starttime) - (time - $endtime);
1692   }
1693   if ($timediff > 0) {
1694     $retries = int(log($timediff) / log(2));
1695   } else {
1696     $retries = 1;  # Use the minimum.
1697   }
1698   return ($retries > 3) ? $retries : 3;
1699 }
1700
1701 sub retry_op {
1702   # Pass in two function references.
1703   # This method will be called with the remaining arguments.
1704   # If it dies, retry it with exponential backoff until it succeeds,
1705   # or until the current retry_count is exhausted.  After each failure
1706   # that can be retried, the second function will be called with
1707   # the current try count (0-based), next try time, and error message.
1708   my $operation = shift;
1709   my $retry_callback = shift;
1710   my $retries = retry_count();
1711   foreach my $try_count (0..$retries) {
1712     my $next_try = time + (2 ** $try_count);
1713     my $result = eval { $operation->(@_); };
1714     if (!$@) {
1715       return $result;
1716     } elsif ($try_count < $retries) {
1717       $retry_callback->($try_count, $next_try, $@);
1718       my $sleep_time = $next_try - time;
1719       sleep($sleep_time) if ($sleep_time > 0);
1720     }
1721   }
1722   # Ensure the error message ends in a newline, so Perl doesn't add
1723   # retry_op's line number to it.
1724   chomp($@);
1725   die($@ . "\n");
1726 }
1727
1728 sub api_call {
1729   # Pass in a /-separated API method name, and arguments for it.
1730   # This function will call that method, retrying as needed until
1731   # the current retry_count is exhausted, with a log on the first failure.
1732   my $method_name = shift;
1733   my $log_api_retry = sub {
1734     my ($try_count, $next_try_at, $errmsg) = @_;
1735     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1736     $errmsg =~ s/\s/ /g;
1737     $errmsg =~ s/\s+$//;
1738     my $retry_msg;
1739     if ($next_try_at < time) {
1740       $retry_msg = "Retrying.";
1741     } else {
1742       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1743       $retry_msg = "Retrying at $next_try_fmt.";
1744     }
1745     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1746   };
1747   my $method = $arv;
1748   foreach my $key (split(/\//, $method_name)) {
1749     $method = $method->{$key};
1750   }
1751   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1752 }
1753
1754 sub exit_status_s {
1755   # Given a $?, return a human-readable exit code string like "0" or
1756   # "1" or "0 with signal 1" or "1 with signal 11".
1757   my $exitcode = shift;
1758   my $s = $exitcode >> 8;
1759   if ($exitcode & 0x7f) {
1760     $s .= " with signal " . ($exitcode & 0x7f);
1761   }
1762   if ($exitcode & 0x80) {
1763     $s .= " with core dump";
1764   }
1765   return $s;
1766 }
1767
1768 sub handle_readall {
1769   # Pass in a glob reference to a file handle.
1770   # Read all its contents and return them as a string.
1771   my $fh_glob_ref = shift;
1772   local $/ = undef;
1773   return <$fh_glob_ref>;
1774 }
1775
1776 sub tar_filename_n {
1777   my $n = shift;
1778   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1779 }
1780
1781 sub add_git_archive {
1782   # Pass in a git archive command as a string or list, a la system().
1783   # This method will save its output to be included in the archive sent to the
1784   # build script.
1785   my $git_input;
1786   $git_tar_count++;
1787   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1788     croak("Failed to save git archive: $!");
1789   }
1790   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1791   close($git_input);
1792   waitpid($git_pid, 0);
1793   close(GIT_ARCHIVE);
1794   if ($?) {
1795     croak("Failed to save git archive: git exited " . exit_status_s($?));
1796   }
1797 }
1798
1799 sub combined_git_archive {
1800   # Combine all saved tar archives into a single archive, then return its
1801   # contents in a string.  Return undef if no archives have been saved.
1802   if ($git_tar_count < 1) {
1803     return undef;
1804   }
1805   my $base_tar_name = tar_filename_n(1);
1806   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1807     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1808     if ($tar_exit != 0) {
1809       croak("Error preparing build archive: tar -A exited " .
1810             exit_status_s($tar_exit));
1811     }
1812   }
1813   if (!open(GIT_TAR, "<", $base_tar_name)) {
1814     croak("Could not open build archive: $!");
1815   }
1816   my $tar_contents = handle_readall(\*GIT_TAR);
1817   close(GIT_TAR);
1818   return $tar_contents;
1819 }
1820
1821 __DATA__
1822 #!/usr/bin/perl
1823 #
1824 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1825 # server invokes this script on individual compute nodes, or localhost if we're
1826 # running a job locally.  It gets called in two modes:
1827 #
1828 # * No arguments: Installation mode.  Read a tar archive from the DATA
1829 #   file handle; it includes the Crunch script's source code, and
1830 #   maybe SDKs as well.  Those should be installed in the proper
1831 #   locations.  This runs outside of any Docker container, so don't try to
1832 #   introspect Crunch's runtime environment.
1833 #
1834 # * With arguments: Crunch script run mode.  This script should set up the
1835 #   environment, then run the command specified in the arguments.  This runs
1836 #   inside any Docker container.
1837
1838 use Fcntl ':flock';
1839 use File::Path qw( make_path remove_tree );
1840 use POSIX qw(getcwd);
1841
1842 # Map SDK subdirectories to the path environments they belong to.
1843 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1844
1845 my $destdir = $ENV{"CRUNCH_SRC"};
1846 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1847 my $repo = $ENV{"CRUNCH_SRC_URL"};
1848 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1849 my $job_work = $ENV{"JOB_WORK"};
1850 my $task_work = $ENV{"TASK_WORK"};
1851
1852 for my $dir ($destdir, $job_work, $task_work) {
1853   if ($dir) {
1854     make_path $dir;
1855     -e $dir or die "Failed to create temporary directory ($dir): $!";
1856   }
1857 }
1858
1859 if ($task_work) {
1860   remove_tree($task_work, {keep_root => 1});
1861 }
1862
1863 open(STDOUT_ORIG, ">&", STDOUT);
1864 open(STDERR_ORIG, ">&", STDERR);
1865 open(STDOUT, ">>", "$destdir.log");
1866 open(STDERR, ">&", STDOUT);
1867
1868 ### Crunch script run mode
1869 if (@ARGV) {
1870   # We want to do routine logging during task 0 only.  This gives the user
1871   # the information they need, but avoids repeating the information for every
1872   # task.
1873   my $Log;
1874   if ($ENV{TASK_SEQUENCE} eq "0") {
1875     $Log = sub {
1876       my $msg = shift;
1877       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1878     };
1879   } else {
1880     $Log = sub { };
1881   }
1882
1883   my $python_src = "$install_dir/python";
1884   my $venv_dir = "$job_work/.arvados.venv";
1885   my $venv_built = -e "$venv_dir/bin/activate";
1886   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1887     shell_or_die("virtualenv", "--quiet", "--system-site-packages",
1888                  "--python=python2.7", $venv_dir);
1889     shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1890     $venv_built = 1;
1891     $Log->("Built Python SDK virtualenv");
1892   }
1893
1894   my $pip_bin = "pip";
1895   if ($venv_built) {
1896     $Log->("Running in Python SDK virtualenv");
1897     $pip_bin = "$venv_dir/bin/pip";
1898     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1899     @ARGV = ("/bin/sh", "-ec",
1900              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1901   } elsif (-d $python_src) {
1902     $Log->("Warning: virtualenv not found inside Docker container default " .
1903            "\$PATH. Can't install Python SDK.");
1904   }
1905
1906   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1907   if ($pkgs) {
1908     $Log->("Using Arvados SDK:");
1909     foreach my $line (split /\n/, $pkgs) {
1910       $Log->($line);
1911     }
1912   } else {
1913     $Log->("Arvados SDK packages not found");
1914   }
1915
1916   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1917     my $sdk_path = "$install_dir/$sdk_dir";
1918     if (-d $sdk_path) {
1919       if ($ENV{$sdk_envkey}) {
1920         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1921       } else {
1922         $ENV{$sdk_envkey} = $sdk_path;
1923       }
1924       $Log->("Arvados SDK added to %s", $sdk_envkey);
1925     }
1926   }
1927
1928   close(STDOUT);
1929   close(STDERR);
1930   open(STDOUT, ">&", STDOUT_ORIG);
1931   open(STDERR, ">&", STDERR_ORIG);
1932   exec(@ARGV);
1933   die "Cannot exec `@ARGV`: $!";
1934 }
1935
1936 ### Installation mode
1937 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1938 flock L, LOCK_EX;
1939 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1940   # This version already installed -> nothing to do.
1941   exit(0);
1942 }
1943
1944 unlink "$destdir.commit";
1945 mkdir $destdir;
1946
1947 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1948   die "Error launching 'tar -xC $destdir': $!";
1949 }
1950 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1951 # get SIGPIPE.  We must feed it data incrementally.
1952 my $tar_input;
1953 while (read(DATA, $tar_input, 65536)) {
1954   print TARX $tar_input;
1955 }
1956 if(!close(TARX)) {
1957   die "'tar -xC $destdir' exited $?: $!";
1958 }
1959
1960 mkdir $install_dir;
1961
1962 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1963 if (-d $sdk_root) {
1964   foreach my $sdk_lang (("python",
1965                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1966     if (-d "$sdk_root/$sdk_lang") {
1967       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1968         die "Failed to install $sdk_lang SDK: $!";
1969       }
1970     }
1971   }
1972 }
1973
1974 my $python_dir = "$install_dir/python";
1975 if ((-d $python_dir) and can_run("python2.7") and
1976     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1977   # egg_info failed, probably when it asked git for a build tag.
1978   # Specify no build tag.
1979   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1980   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1981   close($pysdk_cfg);
1982 }
1983
1984 if (-e "$destdir/crunch_scripts/install") {
1985     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1986 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1987     # Old version
1988     shell_or_die ("./tests/autotests.sh", $install_dir);
1989 } elsif (-e "./install.sh") {
1990     shell_or_die ("./install.sh", $install_dir);
1991 }
1992
1993 if ($commit) {
1994     unlink "$destdir.commit.new";
1995     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1996     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1997 }
1998
1999 close L;
2000
2001 sub can_run {
2002   my $command_name = shift;
2003   open(my $which, "-|", "which", $command_name);
2004   while (<$which>) { }
2005   close($which);
2006   return ($? == 0);
2007 }
2008
2009 sub shell_or_die
2010 {
2011   if ($ENV{"DEBUG"}) {
2012     print STDERR "@_\n";
2013   }
2014   if (system (@_) != 0) {
2015     my $err = $!;
2016     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
2017     open STDERR, ">&STDERR_ORIG";
2018     system ("cat $destdir.log >&2");
2019     die "@_ failed ($err): $exitstatus";
2020   }
2021 }
2022
2023 __DATA__