Merge branch '5425-set-docker-memory-limits' refs #5425
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my $force_unlock;
122 my $git_dir;
123 my $jobspec;
124 my $job_api_token;
125 my $no_clear_tmp;
126 my $resume_stash;
127 GetOptions('force-unlock' => \$force_unlock,
128            'git-dir=s' => \$git_dir,
129            'job=s' => \$jobspec,
130            'job-api-token=s' => \$job_api_token,
131            'no-clear-tmp' => \$no_clear_tmp,
132            'resume-stash=s' => \$resume_stash,
133     );
134
135 if (defined $job_api_token) {
136   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 }
138
139 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
140 my $local_job = 0;
141
142
143 $SIG{'USR1'} = sub
144 {
145   $main::ENV{CRUNCH_DEBUG} = 1;
146 };
147 $SIG{'USR2'} = sub
148 {
149   $main::ENV{CRUNCH_DEBUG} = 0;
150 };
151
152
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $User = api_call("users/current");
163
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   if (!$force_unlock) {
169     # Claim this job, and make sure nobody else does
170     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
171     if ($@) {
172       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
173       exit EX_TEMPFAIL;
174     };
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189   $Job->{'state'} = 'Running';
190
191   $Job = api_call("jobs/create", job => $Job);
192 }
193 $job_id = $Job->{'uuid'};
194
195 my $keep_logfile = $job_id . '.log.txt';
196 log_writer_start($keep_logfile);
197
198 $Job->{'runtime_constraints'} ||= {};
199 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
200 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201
202 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
203 if ($? == 0) {
204   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
205   chomp($gem_versions);
206   chop($gem_versions);  # Closing parentheses
207 } else {
208   $gem_versions = "";
209 }
210 Log(undef,
211     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
212
213 Log (undef, "check slurm allocation");
214 my @slot;
215 my @node;
216 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
217 my @sinfo;
218 if (!$have_slurm)
219 {
220   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
221   push @sinfo, "$localcpus localhost";
222 }
223 if (exists $ENV{SLURM_NODELIST})
224 {
225   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
226 }
227 foreach (@sinfo)
228 {
229   my ($ncpus, $slurm_nodelist) = split;
230   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
231
232   my @nodelist;
233   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
234   {
235     my $nodelist = $1;
236     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
237     {
238       my $ranges = $1;
239       foreach (split (",", $ranges))
240       {
241         my ($a, $b);
242         if (/(\d+)-(\d+)/)
243         {
244           $a = $1;
245           $b = $2;
246         }
247         else
248         {
249           $a = $_;
250           $b = $_;
251         }
252         push @nodelist, map {
253           my $n = $nodelist;
254           $n =~ s/\[[-,\d]+\]/$_/;
255           $n;
256         } ($a..$b);
257       }
258     }
259     else
260     {
261       push @nodelist, $nodelist;
262     }
263   }
264   foreach my $nodename (@nodelist)
265   {
266     Log (undef, "node $nodename - $ncpus slots");
267     my $node = { name => $nodename,
268                  ncpus => $ncpus,
269                  losing_streak => 0,
270                  hold_until => 0 };
271     foreach my $cpu (1..$ncpus)
272     {
273       push @slot, { node => $node,
274                     cpu => $cpu };
275     }
276   }
277   push @node, @nodelist;
278 }
279
280
281
282 # Ensure that we get one jobstep running on each allocated node before
283 # we start overloading nodes with concurrent steps
284
285 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
286
287
288 $Job->update_attributes(
289   'tasks_summary' => { 'failed' => 0,
290                        'todo' => 1,
291                        'running' => 0,
292                        'done' => 0 });
293
294 Log (undef, "start");
295 $SIG{'INT'} = sub { $main::please_freeze = 1; };
296 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
297 $SIG{'TERM'} = \&croak;
298 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
299 $SIG{'ALRM'} = sub { $main::please_info = 1; };
300 $SIG{'CONT'} = sub { $main::please_continue = 1; };
301 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
302
303 $main::please_freeze = 0;
304 $main::please_info = 0;
305 $main::please_continue = 0;
306 $main::please_refresh = 0;
307 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
308
309 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
310 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
311 $ENV{"JOB_UUID"} = $job_id;
312
313
314 my @jobstep_todo = ();
315 my @jobstep_done = ();
316 my @jobstep_tomerge = ();
317 my $jobstep_tomerge_level = 0;
318 my $squeue_checked;
319 my $squeue_kill_checked;
320 my $latest_refresh = scalar time;
321
322
323
324 if (defined $Job->{thawedfromkey})
325 {
326   thaw ($Job->{thawedfromkey});
327 }
328 else
329 {
330   my $first_task = api_call("job_tasks/create", job_task => {
331     'job_uuid' => $Job->{'uuid'},
332     'sequence' => 0,
333     'qsequence' => 0,
334     'parameters' => {},
335   });
336   push @jobstep, { 'level' => 0,
337                    'failures' => 0,
338                    'arvados_task' => $first_task,
339                  };
340   push @jobstep_todo, 0;
341 }
342
343
344 if (!$have_slurm)
345 {
346   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
347 }
348
349 my $build_script = handle_readall(\*DATA);
350 my $nodelist = join(",", @node);
351 my $git_tar_count = 0;
352
353 if (!defined $no_clear_tmp) {
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355   Log (undef, "Clean work dirs");
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     # Find FUSE mounts that look like Keep mounts (the mount path has the
361     # word "keep") and unmount them.  Then clean up work directories.
362     # TODO: When #5036 is done and widely deployed, we can get rid of the
363     # regular expression and just unmount everything with type fuse.keep.
364     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
365           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
366     exit (1);
367   }
368   while (1)
369   {
370     last if $cleanpid == waitpid (-1, WNOHANG);
371     freeze_if_want_freeze ($cleanpid);
372     select (undef, undef, undef, 0.1);
373   }
374   Log (undef, "Cleanup command exited ".exit_status_s($?));
375 }
376
377 # If this job requires a Docker image, install that.
378 my $docker_bin = "/usr/bin/docker.io";
379 my ($docker_locator, $docker_stream, $docker_hash);
380 if ($docker_locator = $Job->{docker_image_locator}) {
381   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
382   if (!$docker_hash)
383   {
384     croak("No Docker image hash found from locator $docker_locator");
385   }
386   $docker_stream =~ s/^\.//;
387   my $docker_install_script = qq{
388 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
389     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
390 fi
391 };
392   my $docker_pid = fork();
393   if ($docker_pid == 0)
394   {
395     srun (["srun", "--nodelist=" . join(',', @node)],
396           ["/bin/sh", "-ec", $docker_install_script]);
397     exit ($?);
398   }
399   while (1)
400   {
401     last if $docker_pid == waitpid (-1, WNOHANG);
402     freeze_if_want_freeze ($docker_pid);
403     select (undef, undef, undef, 0.1);
404   }
405   if ($? != 0)
406   {
407     croak("Installing Docker image from $docker_locator exited "
408           .exit_status_s($?));
409   }
410
411   if ($Job->{arvados_sdk_version}) {
412     # The job also specifies an Arvados SDK version.  Add the SDKs to the
413     # tar file for the build script to install.
414     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
415                        $Job->{arvados_sdk_version}));
416     add_git_archive("git", "--git-dir=$git_dir", "archive",
417                     "--prefix=.arvados.sdk/",
418                     $Job->{arvados_sdk_version}, "sdk");
419   }
420 }
421
422 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
423   # If script_version looks like an absolute path, *and* the --git-dir
424   # argument was not given -- which implies we were not invoked by
425   # crunch-dispatch -- we will use the given path as a working
426   # directory instead of resolving script_version to a git commit (or
427   # doing anything else with git).
428   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
429   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
430 }
431 else {
432   # Resolve the given script_version to a git commit sha1. Also, if
433   # the repository is remote, clone it into our local filesystem: this
434   # ensures "git archive" will work, and is necessary to reliably
435   # resolve a symbolic script_version like "master^".
436   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
437
438   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
439
440   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
441
442   # If we're running under crunch-dispatch, it will have already
443   # pulled the appropriate source tree into its own repository, and
444   # given us that repo's path as $git_dir.
445   #
446   # If we're running a "local" job, we might have to fetch content
447   # from a remote repository.
448   #
449   # (Currently crunch-dispatch gives a local path with --git-dir, but
450   # we might as well accept URLs there too in case it changes its
451   # mind.)
452   my $repo = $git_dir || $Job->{'repository'};
453
454   # Repository can be remote or local. If remote, we'll need to fetch it
455   # to a local dir before doing `git log` et al.
456   my $repo_location;
457
458   if ($repo =~ m{://|^[^/]*:}) {
459     # $repo is a git url we can clone, like git:// or https:// or
460     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
461     # not recognized here because distinguishing that from a local
462     # path is too fragile. If you really need something strange here,
463     # use the ssh:// form.
464     $repo_location = 'remote';
465   } elsif ($repo =~ m{^\.*/}) {
466     # $repo is a local path to a git index. We'll also resolve ../foo
467     # to ../foo/.git if the latter is a directory. To help
468     # disambiguate local paths from named hosted repositories, this
469     # form must be given as ./ or ../ if it's a relative path.
470     if (-d "$repo/.git") {
471       $repo = "$repo/.git";
472     }
473     $repo_location = 'local';
474   } else {
475     # $repo is none of the above. It must be the name of a hosted
476     # repository.
477     my $arv_repo_list = api_call("repositories/list",
478                                  'filters' => [['name','=',$repo]]);
479     my @repos_found = @{$arv_repo_list->{'items'}};
480     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
481     if ($n_found > 0) {
482       Log(undef, "Repository '$repo' -> "
483           . join(", ", map { $_->{'uuid'} } @repos_found));
484     }
485     if ($n_found != 1) {
486       croak("Error: Found $n_found repositories with name '$repo'.");
487     }
488     $repo = $repos_found[0]->{'fetch_url'};
489     $repo_location = 'remote';
490   }
491   Log(undef, "Using $repo_location repository '$repo'");
492   $ENV{"CRUNCH_SRC_URL"} = $repo;
493
494   # Resolve given script_version (we'll call that $treeish here) to a
495   # commit sha1 ($commit).
496   my $treeish = $Job->{'script_version'};
497   my $commit;
498   if ($repo_location eq 'remote') {
499     # We minimize excess object-fetching by re-using the same bare
500     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
501     # just keep adding remotes to it as needed.
502     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
503     my $gitcmd = "git --git-dir=\Q$local_repo\E";
504
505     # Set up our local repo for caching remote objects, making
506     # archives, etc.
507     if (!-d $local_repo) {
508       make_path($local_repo) or croak("Error: could not create $local_repo");
509     }
510     # This works (exits 0 and doesn't delete fetched objects) even
511     # if $local_repo is already initialized:
512     `$gitcmd init --bare`;
513     if ($?) {
514       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
515     }
516
517     # If $treeish looks like a hash (or abbrev hash) we look it up in
518     # our local cache first, since that's cheaper. (We don't want to
519     # do that with tags/branches though -- those change over time, so
520     # they should always be resolved by the remote repo.)
521     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
522       # Hide stderr because it's normal for this to fail:
523       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
524       if ($? == 0 &&
525           # Careful not to resolve a branch named abcdeff to commit 1234567:
526           $sha1 =~ /^$treeish/ &&
527           $sha1 =~ /^([0-9a-f]{40})$/s) {
528         $commit = $1;
529         Log(undef, "Commit $commit already present in $local_repo");
530       }
531     }
532
533     if (!defined $commit) {
534       # If $treeish isn't just a hash or abbrev hash, or isn't here
535       # yet, we need to fetch the remote to resolve it correctly.
536
537       # First, remove all local heads. This prevents a name that does
538       # not exist on the remote from resolving to (or colliding with)
539       # a previously fetched branch or tag (possibly from a different
540       # remote).
541       remove_tree("$local_repo/refs/heads", {keep_root => 1});
542
543       Log(undef, "Fetching objects from $repo to $local_repo");
544       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
545       if ($?) {
546         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
547       }
548     }
549
550     # Now that the data is all here, we will use our local repo for
551     # the rest of our git activities.
552     $repo = $local_repo;
553   }
554
555   my $gitcmd = "git --git-dir=\Q$repo\E";
556   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
557   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
558     croak("`$gitcmd rev-list` exited "
559           .exit_status_s($?)
560           .", '$treeish' not found. Giving up.");
561   }
562   $commit = $1;
563   Log(undef, "Version $treeish is commit $commit");
564
565   if ($commit ne $Job->{'script_version'}) {
566     # Record the real commit id in the database, frozentokey, logs,
567     # etc. -- instead of an abbreviation or a branch name which can
568     # become ambiguous or point to a different commit in the future.
569     if (!$Job->update_attributes('script_version' => $commit)) {
570       croak("Error: failed to update job's script_version attribute");
571     }
572   }
573
574   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
575   add_git_archive("$gitcmd archive ''\Q$commit\E");
576 }
577
578 my $git_archive = combined_git_archive();
579 if (!defined $git_archive) {
580   Log(undef, "Skip install phase (no git archive)");
581   if ($have_slurm) {
582     Log(undef, "Warning: This probably means workers have no source tree!");
583   }
584 }
585 else {
586   Log(undef, "Run install script on all workers");
587
588   my @srunargs = ("srun",
589                   "--nodelist=$nodelist",
590                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
591   my @execargs = ("sh", "-c",
592                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
593
594   my $installpid = fork();
595   if ($installpid == 0)
596   {
597     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
598     exit (1);
599   }
600   while (1)
601   {
602     last if $installpid == waitpid (-1, WNOHANG);
603     freeze_if_want_freeze ($installpid);
604     select (undef, undef, undef, 0.1);
605   }
606   my $install_exited = $?;
607   Log (undef, "Install script exited ".exit_status_s($install_exited));
608   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
609     unlink($tar_filename);
610   }
611   exit (1) if $install_exited != 0;
612 }
613
614 foreach (qw (script script_version script_parameters runtime_constraints))
615 {
616   Log (undef,
617        "$_ " .
618        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
619 }
620 foreach (split (/\n/, $Job->{knobs}))
621 {
622   Log (undef, "knob " . $_);
623 }
624
625
626
627 $main::success = undef;
628
629
630
631 ONELEVEL:
632
633 my $thisround_succeeded = 0;
634 my $thisround_failed = 0;
635 my $thisround_failed_multiple = 0;
636
637 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
638                        or $a <=> $b } @jobstep_todo;
639 my $level = $jobstep[$jobstep_todo[0]]->{level};
640 Log (undef, "start level $level");
641
642
643
644 my %proc;
645 my @freeslot = (0..$#slot);
646 my @holdslot;
647 my %reader;
648 my $progress_is_dirty = 1;
649 my $progress_stats_updated = 0;
650
651 update_progress_stats();
652
653
654
655 THISROUND:
656 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
657 {
658   my $id = $jobstep_todo[$todo_ptr];
659   my $Jobstep = $jobstep[$id];
660   if ($Jobstep->{level} != $level)
661   {
662     next;
663   }
664
665   pipe $reader{$id}, "writer" or croak ($!);
666   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
667   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
668
669   my $childslot = $freeslot[0];
670   my $childnode = $slot[$childslot]->{node};
671   my $childslotname = join (".",
672                             $slot[$childslot]->{node}->{name},
673                             $slot[$childslot]->{cpu});
674
675   my $childpid = fork();
676   if ($childpid == 0)
677   {
678     $SIG{'INT'} = 'DEFAULT';
679     $SIG{'QUIT'} = 'DEFAULT';
680     $SIG{'TERM'} = 'DEFAULT';
681
682     foreach (values (%reader))
683     {
684       close($_);
685     }
686     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
687     open(STDOUT,">&writer");
688     open(STDERR,">&writer");
689
690     undef $dbh;
691     undef $sth;
692
693     delete $ENV{"GNUPGHOME"};
694     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
695     $ENV{"TASK_QSEQUENCE"} = $id;
696     $ENV{"TASK_SEQUENCE"} = $level;
697     $ENV{"JOB_SCRIPT"} = $Job->{script};
698     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
699       $param =~ tr/a-z/A-Z/;
700       $ENV{"JOB_PARAMETER_$param"} = $value;
701     }
702     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
703     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
704     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
705     $ENV{"HOME"} = $ENV{"TASK_WORK"};
706     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
707     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
708     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
709     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
710
711     $ENV{"GZIP"} = "-n";
712
713     my @srunargs = (
714       "srun",
715       "--nodelist=".$childnode->{name},
716       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
717       "--job-name=$job_id.$id.$$",
718         );
719     my $command =
720         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
721         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
722         ."&& cd $ENV{CRUNCH_TMP} "
723         ."&& MEM=\$(cat /proc/meminfo | grep MemTotal | sed 's/\\s\\s*/ /g' |cut -d' ' -f2) "
724         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) ";
725     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
726     if ($docker_hash)
727     {
728       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
729       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
730       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy --memory=\${MEMLIMIT}k ";
731
732       # Dynamically configure the container to use the host system as its
733       # DNS server.  Get the host's global addresses from the ip command,
734       # and turn them into docker --dns options using gawk.
735       $command .=
736           q{$(ip -o address show scope global |
737               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
738
739       # The source tree and $destdir directory (which we have
740       # installed on the worker host) are available in the container,
741       # under the same path.
742       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
743       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
744
745       # Currently, we make arv-mount's mount point appear at /keep
746       # inside the container (instead of using the same path as the
747       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
748       # crunch scripts and utilities must not rely on this. They must
749       # use $TASK_KEEPMOUNT.
750       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
751       $ENV{TASK_KEEPMOUNT} = "/keep";
752
753       # TASK_WORK is almost exactly like a docker data volume: it
754       # starts out empty, is writable, and persists until no
755       # containers use it any more. We don't use --volumes-from to
756       # share it with other containers: it is only accessible to this
757       # task, and it goes away when this task stops.
758       #
759       # However, a docker data volume is writable only by root unless
760       # the mount point already happens to exist in the container with
761       # different permissions. Therefore, we [1] assume /tmp already
762       # exists in the image and is writable by the crunch user; [2]
763       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
764       # writable if they are created by docker while setting up the
765       # other --volumes); and [3] create $TASK_WORK inside the
766       # container using $build_script.
767       $command .= "--volume=/tmp ";
768       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
769       $ENV{"HOME"} = $ENV{"TASK_WORK"};
770       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
771
772       # TODO: Share a single JOB_WORK volume across all task
773       # containers on a given worker node, and delete it when the job
774       # ends (and, in case that doesn't work, when the next job
775       # starts).
776       #
777       # For now, use the same approach as TASK_WORK above.
778       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
779
780       while (my ($env_key, $env_val) = each %ENV)
781       {
782         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
783           $command .= "--env=\Q$env_key=$env_val\E ";
784         }
785       }
786       $command .= "--env=\QHOME=$ENV{HOME}\E ";
787       $command .= "\Q$docker_hash\E ";
788       $command .= "stdbuf --output=0 --error=0 ";
789       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
790     } else {
791       # Non-docker run
792       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
793       $command .= "stdbuf --output=0 --error=0 ";
794       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
795     }
796
797     my @execargs = ('bash', '-c', $command);
798     srun (\@srunargs, \@execargs, undef, $build_script);
799     # exec() failed, we assume nothing happened.
800     die "srun() failed on build script\n";
801   }
802   close("writer");
803   if (!defined $childpid)
804   {
805     close $reader{$id};
806     delete $reader{$id};
807     next;
808   }
809   shift @freeslot;
810   $proc{$childpid} = { jobstep => $id,
811                        time => time,
812                        slot => $childslot,
813                        jobstepname => "$job_id.$id.$childpid",
814                      };
815   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
816   $slot[$childslot]->{pid} = $childpid;
817
818   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
819   Log ($id, "child $childpid started on $childslotname");
820   $Jobstep->{starttime} = time;
821   $Jobstep->{node} = $childnode->{name};
822   $Jobstep->{slotindex} = $childslot;
823   delete $Jobstep->{stderr};
824   delete $Jobstep->{finishtime};
825
826   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
827   $Jobstep->{'arvados_task'}->save;
828
829   splice @jobstep_todo, $todo_ptr, 1;
830   --$todo_ptr;
831
832   $progress_is_dirty = 1;
833
834   while (!@freeslot
835          ||
836          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
837   {
838     last THISROUND if $main::please_freeze || defined($main::success);
839     if ($main::please_info)
840     {
841       $main::please_info = 0;
842       freeze();
843       create_output_collection();
844       save_meta(1);
845       update_progress_stats();
846     }
847     my $gotsome
848         = readfrompipes ()
849         + reapchildren ();
850     if (!$gotsome)
851     {
852       check_refresh_wanted();
853       check_squeue();
854       update_progress_stats();
855       select (undef, undef, undef, 0.1);
856     }
857     elsif (time - $progress_stats_updated >= 30)
858     {
859       update_progress_stats();
860     }
861     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
862         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
863     {
864       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
865           .($thisround_failed+$thisround_succeeded)
866           .") -- giving up on this round";
867       Log (undef, $message);
868       last THISROUND;
869     }
870
871     # move slots from freeslot to holdslot (or back to freeslot) if necessary
872     for (my $i=$#freeslot; $i>=0; $i--) {
873       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
874         push @holdslot, (splice @freeslot, $i, 1);
875       }
876     }
877     for (my $i=$#holdslot; $i>=0; $i--) {
878       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
879         push @freeslot, (splice @holdslot, $i, 1);
880       }
881     }
882
883     # give up if no nodes are succeeding
884     if (!grep { $_->{node}->{losing_streak} == 0 &&
885                     $_->{node}->{hold_count} < 4 } @slot) {
886       my $message = "Every node has failed -- giving up on this round";
887       Log (undef, $message);
888       last THISROUND;
889     }
890   }
891 }
892
893
894 push @freeslot, splice @holdslot;
895 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
896
897
898 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
899 while (%proc)
900 {
901   if ($main::please_continue) {
902     $main::please_continue = 0;
903     goto THISROUND;
904   }
905   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
906   readfrompipes ();
907   if (!reapchildren())
908   {
909     check_refresh_wanted();
910     check_squeue();
911     update_progress_stats();
912     select (undef, undef, undef, 0.1);
913     killem (keys %proc) if $main::please_freeze;
914   }
915 }
916
917 update_progress_stats();
918 freeze_if_want_freeze();
919
920
921 if (!defined $main::success)
922 {
923   if (@jobstep_todo &&
924       $thisround_succeeded == 0 &&
925       ($thisround_failed == 0 || $thisround_failed > 4))
926   {
927     my $message = "stop because $thisround_failed tasks failed and none succeeded";
928     Log (undef, $message);
929     $main::success = 0;
930   }
931   if (!@jobstep_todo)
932   {
933     $main::success = 1;
934   }
935 }
936
937 goto ONELEVEL if !defined $main::success;
938
939
940 release_allocation();
941 freeze();
942 my $collated_output = &create_output_collection();
943
944 if (!$collated_output) {
945   Log (undef, "Failed to write output collection");
946 }
947 else {
948   Log(undef, "job output $collated_output");
949   $Job->update_attributes('output' => $collated_output);
950 }
951
952 Log (undef, "finish");
953
954 save_meta();
955
956 my $final_state;
957 if ($collated_output && $main::success) {
958   $final_state = 'Complete';
959 } else {
960   $final_state = 'Failed';
961 }
962 $Job->update_attributes('state' => $final_state);
963
964 exit (($final_state eq 'Complete') ? 0 : 1);
965
966
967
968 sub update_progress_stats
969 {
970   $progress_stats_updated = time;
971   return if !$progress_is_dirty;
972   my ($todo, $done, $running) = (scalar @jobstep_todo,
973                                  scalar @jobstep_done,
974                                  scalar @slot - scalar @freeslot - scalar @holdslot);
975   $Job->{'tasks_summary'} ||= {};
976   $Job->{'tasks_summary'}->{'todo'} = $todo;
977   $Job->{'tasks_summary'}->{'done'} = $done;
978   $Job->{'tasks_summary'}->{'running'} = $running;
979   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
980   Log (undef, "status: $done done, $running running, $todo todo");
981   $progress_is_dirty = 0;
982 }
983
984
985
986 sub reapchildren
987 {
988   my $pid = waitpid (-1, WNOHANG);
989   return 0 if $pid <= 0;
990
991   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
992                   . "."
993                   . $slot[$proc{$pid}->{slot}]->{cpu});
994   my $jobstepid = $proc{$pid}->{jobstep};
995   my $elapsed = time - $proc{$pid}->{time};
996   my $Jobstep = $jobstep[$jobstepid];
997
998   my $childstatus = $?;
999   my $exitvalue = $childstatus >> 8;
1000   my $exitinfo = "exit ".exit_status_s($childstatus);
1001   $Jobstep->{'arvados_task'}->reload;
1002   my $task_success = $Jobstep->{'arvados_task'}->{success};
1003
1004   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1005
1006   if (!defined $task_success) {
1007     # task did not indicate one way or the other --> fail
1008     $Jobstep->{'arvados_task'}->{success} = 0;
1009     $Jobstep->{'arvados_task'}->save;
1010     $task_success = 0;
1011   }
1012
1013   if (!$task_success)
1014   {
1015     my $temporary_fail;
1016     $temporary_fail ||= $Jobstep->{node_fail};
1017     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1018
1019     ++$thisround_failed;
1020     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1021
1022     # Check for signs of a failed or misconfigured node
1023     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1024         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1025       # Don't count this against jobstep failure thresholds if this
1026       # node is already suspected faulty and srun exited quickly
1027       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1028           $elapsed < 5) {
1029         Log ($jobstepid, "blaming failure on suspect node " .
1030              $slot[$proc{$pid}->{slot}]->{node}->{name});
1031         $temporary_fail ||= 1;
1032       }
1033       ban_node_by_slot($proc{$pid}->{slot});
1034     }
1035
1036     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1037                              ++$Jobstep->{'failures'},
1038                              $temporary_fail ? 'temporary ' : 'permanent',
1039                              $elapsed));
1040
1041     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1042       # Give up on this task, and the whole job
1043       $main::success = 0;
1044     }
1045     # Put this task back on the todo queue
1046     push @jobstep_todo, $jobstepid;
1047     $Job->{'tasks_summary'}->{'failed'}++;
1048   }
1049   else
1050   {
1051     ++$thisround_succeeded;
1052     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1053     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1054     push @jobstep_done, $jobstepid;
1055     Log ($jobstepid, "success in $elapsed seconds");
1056   }
1057   $Jobstep->{exitcode} = $childstatus;
1058   $Jobstep->{finishtime} = time;
1059   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1060   $Jobstep->{'arvados_task'}->save;
1061   process_stderr ($jobstepid, $task_success);
1062   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1063                            length($Jobstep->{'arvados_task'}->{output}),
1064                            $Jobstep->{'arvados_task'}->{output}));
1065
1066   close $reader{$jobstepid};
1067   delete $reader{$jobstepid};
1068   delete $slot[$proc{$pid}->{slot}]->{pid};
1069   push @freeslot, $proc{$pid}->{slot};
1070   delete $proc{$pid};
1071
1072   if ($task_success) {
1073     # Load new tasks
1074     my $newtask_list = [];
1075     my $newtask_results;
1076     do {
1077       $newtask_results = api_call(
1078         "job_tasks/list",
1079         'where' => {
1080           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1081         },
1082         'order' => 'qsequence',
1083         'offset' => scalar(@$newtask_list),
1084       );
1085       push(@$newtask_list, @{$newtask_results->{items}});
1086     } while (@{$newtask_results->{items}});
1087     foreach my $arvados_task (@$newtask_list) {
1088       my $jobstep = {
1089         'level' => $arvados_task->{'sequence'},
1090         'failures' => 0,
1091         'arvados_task' => $arvados_task
1092       };
1093       push @jobstep, $jobstep;
1094       push @jobstep_todo, $#jobstep;
1095     }
1096   }
1097
1098   $progress_is_dirty = 1;
1099   1;
1100 }
1101
1102 sub check_refresh_wanted
1103 {
1104   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1105   if (@stat && $stat[9] > $latest_refresh) {
1106     $latest_refresh = scalar time;
1107     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1108     for my $attr ('cancelled_at',
1109                   'cancelled_by_user_uuid',
1110                   'cancelled_by_client_uuid',
1111                   'state') {
1112       $Job->{$attr} = $Job2->{$attr};
1113     }
1114     if ($Job->{'state'} ne "Running") {
1115       if ($Job->{'state'} eq "Cancelled") {
1116         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1117       } else {
1118         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1119       }
1120       $main::success = 0;
1121       $main::please_freeze = 1;
1122     }
1123   }
1124 }
1125
1126 sub check_squeue
1127 {
1128   # return if the kill list was checked <4 seconds ago
1129   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1130   {
1131     return;
1132   }
1133   $squeue_kill_checked = time;
1134
1135   # use killem() on procs whose killtime is reached
1136   for (keys %proc)
1137   {
1138     if (exists $proc{$_}->{killtime}
1139         && $proc{$_}->{killtime} <= time)
1140     {
1141       killem ($_);
1142     }
1143   }
1144
1145   # return if the squeue was checked <60 seconds ago
1146   if (defined $squeue_checked && $squeue_checked > time - 60)
1147   {
1148     return;
1149   }
1150   $squeue_checked = time;
1151
1152   if (!$have_slurm)
1153   {
1154     # here is an opportunity to check for mysterious problems with local procs
1155     return;
1156   }
1157
1158   # get a list of steps still running
1159   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1160   chop @squeue;
1161   if ($squeue[-1] ne "ok")
1162   {
1163     return;
1164   }
1165   pop @squeue;
1166
1167   # which of my jobsteps are running, according to squeue?
1168   my %ok;
1169   foreach (@squeue)
1170   {
1171     if (/^(\d+)\.(\d+) (\S+)/)
1172     {
1173       if ($1 eq $ENV{SLURM_JOBID})
1174       {
1175         $ok{$3} = 1;
1176       }
1177     }
1178   }
1179
1180   # which of my active child procs (>60s old) were not mentioned by squeue?
1181   foreach (keys %proc)
1182   {
1183     if ($proc{$_}->{time} < time - 60
1184         && !exists $ok{$proc{$_}->{jobstepname}}
1185         && !exists $proc{$_}->{killtime})
1186     {
1187       # kill this proc if it hasn't exited in 30 seconds
1188       $proc{$_}->{killtime} = time + 30;
1189     }
1190   }
1191 }
1192
1193
1194 sub release_allocation
1195 {
1196   if ($have_slurm)
1197   {
1198     Log (undef, "release job allocation");
1199     system "scancel $ENV{SLURM_JOBID}";
1200   }
1201 }
1202
1203
1204 sub readfrompipes
1205 {
1206   my $gotsome = 0;
1207   foreach my $job (keys %reader)
1208   {
1209     my $buf;
1210     while (0 < sysread ($reader{$job}, $buf, 8192))
1211     {
1212       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1213       $jobstep[$job]->{stderr} .= $buf;
1214       preprocess_stderr ($job);
1215       if (length ($jobstep[$job]->{stderr}) > 16384)
1216       {
1217         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1218       }
1219       $gotsome = 1;
1220     }
1221   }
1222   return $gotsome;
1223 }
1224
1225
1226 sub preprocess_stderr
1227 {
1228   my $job = shift;
1229
1230   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1231     my $line = $1;
1232     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1233     Log ($job, "stderr $line");
1234     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1235       # whoa.
1236       $main::please_freeze = 1;
1237     }
1238     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure)/) {
1239       $jobstep[$job]->{node_fail} = 1;
1240       ban_node_by_slot($jobstep[$job]->{slotindex});
1241     }
1242   }
1243 }
1244
1245
1246 sub process_stderr
1247 {
1248   my $job = shift;
1249   my $task_success = shift;
1250   preprocess_stderr ($job);
1251
1252   map {
1253     Log ($job, "stderr $_");
1254   } split ("\n", $jobstep[$job]->{stderr});
1255 }
1256
1257 sub fetch_block
1258 {
1259   my $hash = shift;
1260   my $keep;
1261   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1262     Log(undef, "fetch_block run error from arv-get $hash: $!");
1263     return undef;
1264   }
1265   my $output_block = "";
1266   while (1) {
1267     my $buf;
1268     my $bytes = sysread($keep, $buf, 1024 * 1024);
1269     if (!defined $bytes) {
1270       Log(undef, "fetch_block read error from arv-get: $!");
1271       $output_block = undef;
1272       last;
1273     } elsif ($bytes == 0) {
1274       # sysread returns 0 at the end of the pipe.
1275       last;
1276     } else {
1277       # some bytes were read into buf.
1278       $output_block .= $buf;
1279     }
1280   }
1281   close $keep;
1282   if ($?) {
1283     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1284     $output_block = undef;
1285   }
1286   return $output_block;
1287 }
1288
1289 # Create a collection by concatenating the output of all tasks (each
1290 # task's output is either a manifest fragment, a locator for a
1291 # manifest fragment stored in Keep, or nothing at all). Return the
1292 # portable_data_hash of the new collection.
1293 sub create_output_collection
1294 {
1295   Log (undef, "collate");
1296
1297   my ($child_out, $child_in);
1298   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1299 import arvados
1300 import sys
1301 print (arvados.api("v1").collections().
1302        create(body={"manifest_text": sys.stdin.read()}).
1303        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1304 }, retry_count());
1305
1306   my $task_idx = -1;
1307   my $manifest_size = 0;
1308   for (@jobstep)
1309   {
1310     ++$task_idx;
1311     my $output = $_->{'arvados_task'}->{output};
1312     next if (!defined($output));
1313     my $next_write;
1314     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1315       $next_write = fetch_block($output);
1316     } else {
1317       $next_write = $output;
1318     }
1319     if (defined($next_write)) {
1320       if (!defined(syswrite($child_in, $next_write))) {
1321         # There's been an error writing.  Stop the loop.
1322         # We'll log details about the exit code later.
1323         last;
1324       } else {
1325         $manifest_size += length($next_write);
1326       }
1327     } else {
1328       my $uuid = $_->{'arvados_task'}->{'uuid'};
1329       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1330       $main::success = 0;
1331     }
1332   }
1333   close($child_in);
1334   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1335
1336   my $joboutput;
1337   my $s = IO::Select->new($child_out);
1338   if ($s->can_read(120)) {
1339     sysread($child_out, $joboutput, 1024 * 1024);
1340     waitpid($pid, 0);
1341     if ($?) {
1342       Log(undef, "output collection creation exited " . exit_status_s($?));
1343       $joboutput = undef;
1344     } else {
1345       chomp($joboutput);
1346     }
1347   } else {
1348     Log (undef, "timed out while creating output collection");
1349     foreach my $signal (2, 2, 2, 15, 15, 9) {
1350       kill($signal, $pid);
1351       last if waitpid($pid, WNOHANG) == -1;
1352       sleep(1);
1353     }
1354   }
1355   close($child_out);
1356
1357   return $joboutput;
1358 }
1359
1360
1361 sub killem
1362 {
1363   foreach (@_)
1364   {
1365     my $sig = 2;                # SIGINT first
1366     if (exists $proc{$_}->{"sent_$sig"} &&
1367         time - $proc{$_}->{"sent_$sig"} > 4)
1368     {
1369       $sig = 15;                # SIGTERM if SIGINT doesn't work
1370     }
1371     if (exists $proc{$_}->{"sent_$sig"} &&
1372         time - $proc{$_}->{"sent_$sig"} > 4)
1373     {
1374       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1375     }
1376     if (!exists $proc{$_}->{"sent_$sig"})
1377     {
1378       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1379       kill $sig, $_;
1380       select (undef, undef, undef, 0.1);
1381       if ($sig == 2)
1382       {
1383         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1384       }
1385       $proc{$_}->{"sent_$sig"} = time;
1386       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1387     }
1388   }
1389 }
1390
1391
1392 sub fhbits
1393 {
1394   my($bits);
1395   for (@_) {
1396     vec($bits,fileno($_),1) = 1;
1397   }
1398   $bits;
1399 }
1400
1401
1402 # Send log output to Keep via arv-put.
1403 #
1404 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1405 # $log_pipe_pid is the pid of the arv-put subprocess.
1406 #
1407 # The only functions that should access these variables directly are:
1408 #
1409 # log_writer_start($logfilename)
1410 #     Starts an arv-put pipe, reading data on stdin and writing it to
1411 #     a $logfilename file in an output collection.
1412 #
1413 # log_writer_send($txt)
1414 #     Writes $txt to the output log collection.
1415 #
1416 # log_writer_finish()
1417 #     Closes the arv-put pipe and returns the output that it produces.
1418 #
1419 # log_writer_is_active()
1420 #     Returns a true value if there is currently a live arv-put
1421 #     process, false otherwise.
1422 #
1423 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1424
1425 sub log_writer_start($)
1426 {
1427   my $logfilename = shift;
1428   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1429                         'arv-put',
1430                         '--portable-data-hash',
1431                         '--project-uuid', $Job->{owner_uuid},
1432                         '--retries', '3',
1433                         '--name', $logfilename,
1434                         '--filename', $logfilename,
1435                         '-');
1436 }
1437
1438 sub log_writer_send($)
1439 {
1440   my $txt = shift;
1441   print $log_pipe_in $txt;
1442 }
1443
1444 sub log_writer_finish()
1445 {
1446   return unless $log_pipe_pid;
1447
1448   close($log_pipe_in);
1449   my $arv_put_output;
1450
1451   my $s = IO::Select->new($log_pipe_out);
1452   if ($s->can_read(120)) {
1453     sysread($log_pipe_out, $arv_put_output, 1024);
1454     chomp($arv_put_output);
1455   } else {
1456     Log (undef, "timed out reading from 'arv-put'");
1457   }
1458
1459   waitpid($log_pipe_pid, 0);
1460   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1461   if ($?) {
1462     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1463   }
1464
1465   return $arv_put_output;
1466 }
1467
1468 sub log_writer_is_active() {
1469   return $log_pipe_pid;
1470 }
1471
1472 sub Log                         # ($jobstep_id, $logmessage)
1473 {
1474   if ($_[1] =~ /\n/) {
1475     for my $line (split (/\n/, $_[1])) {
1476       Log ($_[0], $line);
1477     }
1478     return;
1479   }
1480   my $fh = select STDERR; $|=1; select $fh;
1481   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1482   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1483   $message .= "\n";
1484   my $datetime;
1485   if (log_writer_is_active() || -t STDERR) {
1486     my @gmtime = gmtime;
1487     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1488                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1489   }
1490   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1491
1492   if (log_writer_is_active()) {
1493     log_writer_send($datetime . " " . $message);
1494   }
1495 }
1496
1497
1498 sub croak
1499 {
1500   my ($package, $file, $line) = caller;
1501   my $message = "@_ at $file line $line\n";
1502   Log (undef, $message);
1503   freeze() if @jobstep_todo;
1504   create_output_collection() if @jobstep_todo;
1505   cleanup();
1506   save_meta();
1507   die;
1508 }
1509
1510
1511 sub cleanup
1512 {
1513   return unless $Job;
1514   if ($Job->{'state'} eq 'Cancelled') {
1515     $Job->update_attributes('finished_at' => scalar gmtime);
1516   } else {
1517     $Job->update_attributes('state' => 'Failed');
1518   }
1519 }
1520
1521
1522 sub save_meta
1523 {
1524   my $justcheckpoint = shift; # false if this will be the last meta saved
1525   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1526   return unless log_writer_is_active();
1527
1528   my $loglocator = log_writer_finish();
1529   Log (undef, "log manifest is $loglocator");
1530   $Job->{'log'} = $loglocator;
1531   $Job->update_attributes('log', $loglocator);
1532 }
1533
1534
1535 sub freeze_if_want_freeze
1536 {
1537   if ($main::please_freeze)
1538   {
1539     release_allocation();
1540     if (@_)
1541     {
1542       # kill some srun procs before freeze+stop
1543       map { $proc{$_} = {} } @_;
1544       while (%proc)
1545       {
1546         killem (keys %proc);
1547         select (undef, undef, undef, 0.1);
1548         my $died;
1549         while (($died = waitpid (-1, WNOHANG)) > 0)
1550         {
1551           delete $proc{$died};
1552         }
1553       }
1554     }
1555     freeze();
1556     create_output_collection();
1557     cleanup();
1558     save_meta();
1559     exit 1;
1560   }
1561 }
1562
1563
1564 sub freeze
1565 {
1566   Log (undef, "Freeze not implemented");
1567   return;
1568 }
1569
1570
1571 sub thaw
1572 {
1573   croak ("Thaw not implemented");
1574 }
1575
1576
1577 sub freezequote
1578 {
1579   my $s = shift;
1580   $s =~ s/\\/\\\\/g;
1581   $s =~ s/\n/\\n/g;
1582   return $s;
1583 }
1584
1585
1586 sub freezeunquote
1587 {
1588   my $s = shift;
1589   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1590   return $s;
1591 }
1592
1593
1594 sub srun
1595 {
1596   my $srunargs = shift;
1597   my $execargs = shift;
1598   my $opts = shift || {};
1599   my $stdin = shift;
1600   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1601
1602   $Data::Dumper::Terse = 1;
1603   $Data::Dumper::Indent = 0;
1604   my $show_cmd = Dumper($args);
1605   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1606   $show_cmd =~ s/\n/ /g;
1607   warn "starting: $show_cmd\n";
1608
1609   if (defined $stdin) {
1610     my $child = open STDIN, "-|";
1611     defined $child or die "no fork: $!";
1612     if ($child == 0) {
1613       print $stdin or die $!;
1614       close STDOUT or die $!;
1615       exit 0;
1616     }
1617   }
1618
1619   return system (@$args) if $opts->{fork};
1620
1621   exec @$args;
1622   warn "ENV size is ".length(join(" ",%ENV));
1623   die "exec failed: $!: @$args";
1624 }
1625
1626
1627 sub ban_node_by_slot {
1628   # Don't start any new jobsteps on this node for 60 seconds
1629   my $slotid = shift;
1630   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1631   $slot[$slotid]->{node}->{hold_count}++;
1632   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1633 }
1634
1635 sub must_lock_now
1636 {
1637   my ($lockfile, $error_message) = @_;
1638   open L, ">", $lockfile or croak("$lockfile: $!");
1639   if (!flock L, LOCK_EX|LOCK_NB) {
1640     croak("Can't lock $lockfile: $error_message\n");
1641   }
1642 }
1643
1644 sub find_docker_image {
1645   # Given a Keep locator, check to see if it contains a Docker image.
1646   # If so, return its stream name and Docker hash.
1647   # If not, return undef for both values.
1648   my $locator = shift;
1649   my ($streamname, $filename);
1650   my $image = api_call("collections/get", uuid => $locator);
1651   if ($image) {
1652     foreach my $line (split(/\n/, $image->{manifest_text})) {
1653       my @tokens = split(/\s+/, $line);
1654       next if (!@tokens);
1655       $streamname = shift(@tokens);
1656       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1657         if (defined($filename)) {
1658           return (undef, undef);  # More than one file in the Collection.
1659         } else {
1660           $filename = (split(/:/, $filedata, 3))[2];
1661         }
1662       }
1663     }
1664   }
1665   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1666     return ($streamname, $1);
1667   } else {
1668     return (undef, undef);
1669   }
1670 }
1671
1672 sub retry_count {
1673   # Calculate the number of times an operation should be retried,
1674   # assuming exponential backoff, and that we're willing to retry as
1675   # long as tasks have been running.  Enforce a minimum of 3 retries.
1676   my ($starttime, $endtime, $timediff, $retries);
1677   if (@jobstep) {
1678     $starttime = $jobstep[0]->{starttime};
1679     $endtime = $jobstep[-1]->{finishtime};
1680   }
1681   if (!defined($starttime)) {
1682     $timediff = 0;
1683   } elsif (!defined($endtime)) {
1684     $timediff = time - $starttime;
1685   } else {
1686     $timediff = ($endtime - $starttime) - (time - $endtime);
1687   }
1688   if ($timediff > 0) {
1689     $retries = int(log($timediff) / log(2));
1690   } else {
1691     $retries = 1;  # Use the minimum.
1692   }
1693   return ($retries > 3) ? $retries : 3;
1694 }
1695
1696 sub retry_op {
1697   # Pass in two function references.
1698   # This method will be called with the remaining arguments.
1699   # If it dies, retry it with exponential backoff until it succeeds,
1700   # or until the current retry_count is exhausted.  After each failure
1701   # that can be retried, the second function will be called with
1702   # the current try count (0-based), next try time, and error message.
1703   my $operation = shift;
1704   my $retry_callback = shift;
1705   my $retries = retry_count();
1706   foreach my $try_count (0..$retries) {
1707     my $next_try = time + (2 ** $try_count);
1708     my $result = eval { $operation->(@_); };
1709     if (!$@) {
1710       return $result;
1711     } elsif ($try_count < $retries) {
1712       $retry_callback->($try_count, $next_try, $@);
1713       my $sleep_time = $next_try - time;
1714       sleep($sleep_time) if ($sleep_time > 0);
1715     }
1716   }
1717   # Ensure the error message ends in a newline, so Perl doesn't add
1718   # retry_op's line number to it.
1719   chomp($@);
1720   die($@ . "\n");
1721 }
1722
1723 sub api_call {
1724   # Pass in a /-separated API method name, and arguments for it.
1725   # This function will call that method, retrying as needed until
1726   # the current retry_count is exhausted, with a log on the first failure.
1727   my $method_name = shift;
1728   my $log_api_retry = sub {
1729     my ($try_count, $next_try_at, $errmsg) = @_;
1730     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1731     $errmsg =~ s/\s/ /g;
1732     $errmsg =~ s/\s+$//;
1733     my $retry_msg;
1734     if ($next_try_at < time) {
1735       $retry_msg = "Retrying.";
1736     } else {
1737       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1738       $retry_msg = "Retrying at $next_try_fmt.";
1739     }
1740     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1741   };
1742   my $method = $arv;
1743   foreach my $key (split(/\//, $method_name)) {
1744     $method = $method->{$key};
1745   }
1746   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1747 }
1748
1749 sub exit_status_s {
1750   # Given a $?, return a human-readable exit code string like "0" or
1751   # "1" or "0 with signal 1" or "1 with signal 11".
1752   my $exitcode = shift;
1753   my $s = $exitcode >> 8;
1754   if ($exitcode & 0x7f) {
1755     $s .= " with signal " . ($exitcode & 0x7f);
1756   }
1757   if ($exitcode & 0x80) {
1758     $s .= " with core dump";
1759   }
1760   return $s;
1761 }
1762
1763 sub handle_readall {
1764   # Pass in a glob reference to a file handle.
1765   # Read all its contents and return them as a string.
1766   my $fh_glob_ref = shift;
1767   local $/ = undef;
1768   return <$fh_glob_ref>;
1769 }
1770
1771 sub tar_filename_n {
1772   my $n = shift;
1773   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1774 }
1775
1776 sub add_git_archive {
1777   # Pass in a git archive command as a string or list, a la system().
1778   # This method will save its output to be included in the archive sent to the
1779   # build script.
1780   my $git_input;
1781   $git_tar_count++;
1782   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1783     croak("Failed to save git archive: $!");
1784   }
1785   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1786   close($git_input);
1787   waitpid($git_pid, 0);
1788   close(GIT_ARCHIVE);
1789   if ($?) {
1790     croak("Failed to save git archive: git exited " . exit_status_s($?));
1791   }
1792 }
1793
1794 sub combined_git_archive {
1795   # Combine all saved tar archives into a single archive, then return its
1796   # contents in a string.  Return undef if no archives have been saved.
1797   if ($git_tar_count < 1) {
1798     return undef;
1799   }
1800   my $base_tar_name = tar_filename_n(1);
1801   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1802     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1803     if ($tar_exit != 0) {
1804       croak("Error preparing build archive: tar -A exited " .
1805             exit_status_s($tar_exit));
1806     }
1807   }
1808   if (!open(GIT_TAR, "<", $base_tar_name)) {
1809     croak("Could not open build archive: $!");
1810   }
1811   my $tar_contents = handle_readall(\*GIT_TAR);
1812   close(GIT_TAR);
1813   return $tar_contents;
1814 }
1815
1816 __DATA__
1817 #!/usr/bin/perl
1818 #
1819 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1820 # server invokes this script on individual compute nodes, or localhost if we're
1821 # running a job locally.  It gets called in two modes:
1822 #
1823 # * No arguments: Installation mode.  Read a tar archive from the DATA
1824 #   file handle; it includes the Crunch script's source code, and
1825 #   maybe SDKs as well.  Those should be installed in the proper
1826 #   locations.  This runs outside of any Docker container, so don't try to
1827 #   introspect Crunch's runtime environment.
1828 #
1829 # * With arguments: Crunch script run mode.  This script should set up the
1830 #   environment, then run the command specified in the arguments.  This runs
1831 #   inside any Docker container.
1832
1833 use Fcntl ':flock';
1834 use File::Path qw( make_path remove_tree );
1835 use POSIX qw(getcwd);
1836
1837 use constant TASK_TEMPFAIL => 111;
1838
1839 # Map SDK subdirectories to the path environments they belong to.
1840 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1841
1842 my $destdir = $ENV{"CRUNCH_SRC"};
1843 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1844 my $repo = $ENV{"CRUNCH_SRC_URL"};
1845 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1846 my $job_work = $ENV{"JOB_WORK"};
1847 my $task_work = $ENV{"TASK_WORK"};
1848
1849 for my $dir ($destdir, $job_work, $task_work) {
1850   if ($dir) {
1851     make_path $dir;
1852     -e $dir or die "Failed to create temporary directory ($dir): $!";
1853   }
1854 }
1855
1856 if ($task_work) {
1857   remove_tree($task_work, {keep_root => 1});
1858 }
1859
1860 open(STDOUT_ORIG, ">&", STDOUT);
1861 open(STDERR_ORIG, ">&", STDERR);
1862 open(STDOUT, ">>", "$destdir.log");
1863 open(STDERR, ">&", STDOUT);
1864
1865 ### Crunch script run mode
1866 if (@ARGV) {
1867   # We want to do routine logging during task 0 only.  This gives the user
1868   # the information they need, but avoids repeating the information for every
1869   # task.
1870   my $Log;
1871   if ($ENV{TASK_SEQUENCE} eq "0") {
1872     $Log = sub {
1873       my $msg = shift;
1874       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1875     };
1876   } else {
1877     $Log = sub { };
1878   }
1879
1880   my $python_src = "$install_dir/python";
1881   my $venv_dir = "$job_work/.arvados.venv";
1882   my $venv_built = -e "$venv_dir/bin/activate";
1883   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1884     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1885                  "--python=python2.7", $venv_dir);
1886     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1887     $venv_built = 1;
1888     $Log->("Built Python SDK virtualenv");
1889   }
1890
1891   my $pip_bin = "pip";
1892   if ($venv_built) {
1893     $Log->("Running in Python SDK virtualenv");
1894     $pip_bin = "$venv_dir/bin/pip";
1895     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1896     @ARGV = ("/bin/sh", "-ec",
1897              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1898   } elsif (-d $python_src) {
1899     $Log->("Warning: virtualenv not found inside Docker container default " .
1900            "\$PATH. Can't install Python SDK.");
1901   }
1902
1903   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1904   if ($pkgs) {
1905     $Log->("Using Arvados SDK:");
1906     foreach my $line (split /\n/, $pkgs) {
1907       $Log->($line);
1908     }
1909   } else {
1910     $Log->("Arvados SDK packages not found");
1911   }
1912
1913   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1914     my $sdk_path = "$install_dir/$sdk_dir";
1915     if (-d $sdk_path) {
1916       if ($ENV{$sdk_envkey}) {
1917         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1918       } else {
1919         $ENV{$sdk_envkey} = $sdk_path;
1920       }
1921       $Log->("Arvados SDK added to %s", $sdk_envkey);
1922     }
1923   }
1924
1925   close(STDOUT);
1926   close(STDERR);
1927   open(STDOUT, ">&", STDOUT_ORIG);
1928   open(STDERR, ">&", STDERR_ORIG);
1929   exec(@ARGV);
1930   die "Cannot exec `@ARGV`: $!";
1931 }
1932
1933 ### Installation mode
1934 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1935 flock L, LOCK_EX;
1936 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1937   # This version already installed -> nothing to do.
1938   exit(0);
1939 }
1940
1941 unlink "$destdir.commit";
1942 mkdir $destdir;
1943
1944 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1945   die "Error launching 'tar -xC $destdir': $!";
1946 }
1947 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1948 # get SIGPIPE.  We must feed it data incrementally.
1949 my $tar_input;
1950 while (read(DATA, $tar_input, 65536)) {
1951   print TARX $tar_input;
1952 }
1953 if(!close(TARX)) {
1954   die "'tar -xC $destdir' exited $?: $!";
1955 }
1956
1957 mkdir $install_dir;
1958
1959 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1960 if (-d $sdk_root) {
1961   foreach my $sdk_lang (("python",
1962                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
1963     if (-d "$sdk_root/$sdk_lang") {
1964       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1965         die "Failed to install $sdk_lang SDK: $!";
1966       }
1967     }
1968   }
1969 }
1970
1971 my $python_dir = "$install_dir/python";
1972 if ((-d $python_dir) and can_run("python2.7") and
1973     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
1974   # egg_info failed, probably when it asked git for a build tag.
1975   # Specify no build tag.
1976   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
1977   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
1978   close($pysdk_cfg);
1979 }
1980
1981 if (-e "$destdir/crunch_scripts/install") {
1982     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
1983 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1984     # Old version
1985     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
1986 } elsif (-e "./install.sh") {
1987     shell_or_die (undef, "./install.sh", $install_dir);
1988 }
1989
1990 if ($commit) {
1991     unlink "$destdir.commit.new";
1992     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1993     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1994 }
1995
1996 close L;
1997
1998 sub can_run {
1999   my $command_name = shift;
2000   open(my $which, "-|", "which", $command_name);
2001   while (<$which>) { }
2002   close($which);
2003   return ($? == 0);
2004 }
2005
2006 sub shell_or_die
2007 {
2008   my $exitcode = shift;
2009
2010   if ($ENV{"DEBUG"}) {
2011     print STDERR "@_\n";
2012   }
2013   if (system (@_) != 0) {
2014     my $err = $!;
2015     my $code = $?;
2016     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2017     open STDERR, ">&STDERR_ORIG";
2018     system ("cat $destdir.log >&2");
2019     warn "@_ failed ($err): $exitstatus";
2020     if (defined($exitcode)) {
2021       exit $exitcode;
2022     }
2023     else {
2024       exit (($code >> 8) || 1);
2025     }
2026   }
2027 }
2028
2029 __DATA__