Merge branch 'master' into 6093-refresh-docs
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my $force_unlock;
122 my $git_dir;
123 my $jobspec;
124 my $job_api_token;
125 my $no_clear_tmp;
126 my $resume_stash;
127 GetOptions('force-unlock' => \$force_unlock,
128            'git-dir=s' => \$git_dir,
129            'job=s' => \$jobspec,
130            'job-api-token=s' => \$job_api_token,
131            'no-clear-tmp' => \$no_clear_tmp,
132            'resume-stash=s' => \$resume_stash,
133     );
134
135 if (defined $job_api_token) {
136   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 }
138
139 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
140 my $local_job = 0;
141
142
143 $SIG{'USR1'} = sub
144 {
145   $main::ENV{CRUNCH_DEBUG} = 1;
146 };
147 $SIG{'USR2'} = sub
148 {
149   $main::ENV{CRUNCH_DEBUG} = 0;
150 };
151
152
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $User = api_call("users/current");
163
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   if (!$force_unlock) {
169     # Claim this job, and make sure nobody else does
170     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
171     if ($@) {
172       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
173       exit EX_TEMPFAIL;
174     };
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189   $Job->{'state'} = 'Running';
190
191   $Job = api_call("jobs/create", job => $Job);
192 }
193 $job_id = $Job->{'uuid'};
194
195 my $keep_logfile = $job_id . '.log.txt';
196 log_writer_start($keep_logfile);
197
198 $Job->{'runtime_constraints'} ||= {};
199 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
200 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201
202 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
203 if ($? == 0) {
204   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
205   chomp($gem_versions);
206   chop($gem_versions);  # Closing parentheses
207 } else {
208   $gem_versions = "";
209 }
210 Log(undef,
211     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
212
213 Log (undef, "check slurm allocation");
214 my @slot;
215 my @node;
216 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
217 my @sinfo;
218 if (!$have_slurm)
219 {
220   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
221   push @sinfo, "$localcpus localhost";
222 }
223 if (exists $ENV{SLURM_NODELIST})
224 {
225   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
226 }
227 foreach (@sinfo)
228 {
229   my ($ncpus, $slurm_nodelist) = split;
230   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
231
232   my @nodelist;
233   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
234   {
235     my $nodelist = $1;
236     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
237     {
238       my $ranges = $1;
239       foreach (split (",", $ranges))
240       {
241         my ($a, $b);
242         if (/(\d+)-(\d+)/)
243         {
244           $a = $1;
245           $b = $2;
246         }
247         else
248         {
249           $a = $_;
250           $b = $_;
251         }
252         push @nodelist, map {
253           my $n = $nodelist;
254           $n =~ s/\[[-,\d]+\]/$_/;
255           $n;
256         } ($a..$b);
257       }
258     }
259     else
260     {
261       push @nodelist, $nodelist;
262     }
263   }
264   foreach my $nodename (@nodelist)
265   {
266     Log (undef, "node $nodename - $ncpus slots");
267     my $node = { name => $nodename,
268                  ncpus => $ncpus,
269                  losing_streak => 0,
270                  hold_until => 0 };
271     foreach my $cpu (1..$ncpus)
272     {
273       push @slot, { node => $node,
274                     cpu => $cpu };
275     }
276   }
277   push @node, @nodelist;
278 }
279
280
281
282 # Ensure that we get one jobstep running on each allocated node before
283 # we start overloading nodes with concurrent steps
284
285 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
286
287
288 $Job->update_attributes(
289   'tasks_summary' => { 'failed' => 0,
290                        'todo' => 1,
291                        'running' => 0,
292                        'done' => 0 });
293
294 Log (undef, "start");
295 $SIG{'INT'} = sub { $main::please_freeze = 1; };
296 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
297 $SIG{'TERM'} = \&croak;
298 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
299 $SIG{'ALRM'} = sub { $main::please_info = 1; };
300 $SIG{'CONT'} = sub { $main::please_continue = 1; };
301 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
302
303 $main::please_freeze = 0;
304 $main::please_info = 0;
305 $main::please_continue = 0;
306 $main::please_refresh = 0;
307 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
308
309 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
310 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
311 $ENV{"JOB_UUID"} = $job_id;
312
313
314 my @jobstep_todo = ();
315 my @jobstep_done = ();
316 my @jobstep_tomerge = ();
317 my $jobstep_tomerge_level = 0;
318 my $squeue_checked;
319 my $squeue_kill_checked;
320 my $latest_refresh = scalar time;
321
322
323
324 if (defined $Job->{thawedfromkey})
325 {
326   thaw ($Job->{thawedfromkey});
327 }
328 else
329 {
330   my $first_task = api_call("job_tasks/create", job_task => {
331     'job_uuid' => $Job->{'uuid'},
332     'sequence' => 0,
333     'qsequence' => 0,
334     'parameters' => {},
335   });
336   push @jobstep, { 'level' => 0,
337                    'failures' => 0,
338                    'arvados_task' => $first_task,
339                  };
340   push @jobstep_todo, 0;
341 }
342
343
344 if (!$have_slurm)
345 {
346   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
347 }
348
349 my $build_script = handle_readall(\*DATA);
350 my $nodelist = join(",", @node);
351 my $git_tar_count = 0;
352
353 if (!defined $no_clear_tmp) {
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355   Log (undef, "Clean work dirs");
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     # Find FUSE mounts that look like Keep mounts (the mount path has the
361     # word "keep") and unmount them.  Then clean up work directories.
362     # TODO: When #5036 is done and widely deployed, we can get rid of the
363     # regular expression and just unmount everything with type fuse.keep.
364     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
365           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
366     exit (1);
367   }
368   while (1)
369   {
370     last if $cleanpid == waitpid (-1, WNOHANG);
371     freeze_if_want_freeze ($cleanpid);
372     select (undef, undef, undef, 0.1);
373   }
374   Log (undef, "Cleanup command exited ".exit_status_s($?));
375 }
376
377 # If this job requires a Docker image, install that.
378 my $docker_bin = "/usr/bin/docker.io";
379 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
380 if ($docker_locator = $Job->{docker_image_locator}) {
381   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
382   if (!$docker_hash)
383   {
384     croak("No Docker image hash found from locator $docker_locator");
385   }
386   $docker_stream =~ s/^\.//;
387   my $docker_install_script = qq{
388 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
389     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
390 fi
391 };
392   my $docker_pid = fork();
393   if ($docker_pid == 0)
394   {
395     srun (["srun", "--nodelist=" . join(',', @node)],
396           ["/bin/sh", "-ec", $docker_install_script]);
397     exit ($?);
398   }
399   while (1)
400   {
401     last if $docker_pid == waitpid (-1, WNOHANG);
402     freeze_if_want_freeze ($docker_pid);
403     select (undef, undef, undef, 0.1);
404   }
405   if ($? != 0)
406   {
407     croak("Installing Docker image from $docker_locator exited "
408           .exit_status_s($?));
409   }
410
411   # Determine whether this version of Docker supports memory+swap limits.
412   srun(["srun", "--nodelist=" . $node[0]],
413        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
414       {fork => 1});
415   $docker_limitmem = ($? == 0);
416
417   if ($Job->{arvados_sdk_version}) {
418     # The job also specifies an Arvados SDK version.  Add the SDKs to the
419     # tar file for the build script to install.
420     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
421                        $Job->{arvados_sdk_version}));
422     add_git_archive("git", "--git-dir=$git_dir", "archive",
423                     "--prefix=.arvados.sdk/",
424                     $Job->{arvados_sdk_version}, "sdk");
425   }
426 }
427
428 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
429   # If script_version looks like an absolute path, *and* the --git-dir
430   # argument was not given -- which implies we were not invoked by
431   # crunch-dispatch -- we will use the given path as a working
432   # directory instead of resolving script_version to a git commit (or
433   # doing anything else with git).
434   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
435   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
436 }
437 else {
438   # Resolve the given script_version to a git commit sha1. Also, if
439   # the repository is remote, clone it into our local filesystem: this
440   # ensures "git archive" will work, and is necessary to reliably
441   # resolve a symbolic script_version like "master^".
442   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
443
444   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
445
446   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
447
448   # If we're running under crunch-dispatch, it will have already
449   # pulled the appropriate source tree into its own repository, and
450   # given us that repo's path as $git_dir.
451   #
452   # If we're running a "local" job, we might have to fetch content
453   # from a remote repository.
454   #
455   # (Currently crunch-dispatch gives a local path with --git-dir, but
456   # we might as well accept URLs there too in case it changes its
457   # mind.)
458   my $repo = $git_dir || $Job->{'repository'};
459
460   # Repository can be remote or local. If remote, we'll need to fetch it
461   # to a local dir before doing `git log` et al.
462   my $repo_location;
463
464   if ($repo =~ m{://|^[^/]*:}) {
465     # $repo is a git url we can clone, like git:// or https:// or
466     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
467     # not recognized here because distinguishing that from a local
468     # path is too fragile. If you really need something strange here,
469     # use the ssh:// form.
470     $repo_location = 'remote';
471   } elsif ($repo =~ m{^\.*/}) {
472     # $repo is a local path to a git index. We'll also resolve ../foo
473     # to ../foo/.git if the latter is a directory. To help
474     # disambiguate local paths from named hosted repositories, this
475     # form must be given as ./ or ../ if it's a relative path.
476     if (-d "$repo/.git") {
477       $repo = "$repo/.git";
478     }
479     $repo_location = 'local';
480   } else {
481     # $repo is none of the above. It must be the name of a hosted
482     # repository.
483     my $arv_repo_list = api_call("repositories/list",
484                                  'filters' => [['name','=',$repo]]);
485     my @repos_found = @{$arv_repo_list->{'items'}};
486     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
487     if ($n_found > 0) {
488       Log(undef, "Repository '$repo' -> "
489           . join(", ", map { $_->{'uuid'} } @repos_found));
490     }
491     if ($n_found != 1) {
492       croak("Error: Found $n_found repositories with name '$repo'.");
493     }
494     $repo = $repos_found[0]->{'fetch_url'};
495     $repo_location = 'remote';
496   }
497   Log(undef, "Using $repo_location repository '$repo'");
498   $ENV{"CRUNCH_SRC_URL"} = $repo;
499
500   # Resolve given script_version (we'll call that $treeish here) to a
501   # commit sha1 ($commit).
502   my $treeish = $Job->{'script_version'};
503   my $commit;
504   if ($repo_location eq 'remote') {
505     # We minimize excess object-fetching by re-using the same bare
506     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
507     # just keep adding remotes to it as needed.
508     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
509     my $gitcmd = "git --git-dir=\Q$local_repo\E";
510
511     # Set up our local repo for caching remote objects, making
512     # archives, etc.
513     if (!-d $local_repo) {
514       make_path($local_repo) or croak("Error: could not create $local_repo");
515     }
516     # This works (exits 0 and doesn't delete fetched objects) even
517     # if $local_repo is already initialized:
518     `$gitcmd init --bare`;
519     if ($?) {
520       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
521     }
522
523     # If $treeish looks like a hash (or abbrev hash) we look it up in
524     # our local cache first, since that's cheaper. (We don't want to
525     # do that with tags/branches though -- those change over time, so
526     # they should always be resolved by the remote repo.)
527     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
528       # Hide stderr because it's normal for this to fail:
529       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
530       if ($? == 0 &&
531           # Careful not to resolve a branch named abcdeff to commit 1234567:
532           $sha1 =~ /^$treeish/ &&
533           $sha1 =~ /^([0-9a-f]{40})$/s) {
534         $commit = $1;
535         Log(undef, "Commit $commit already present in $local_repo");
536       }
537     }
538
539     if (!defined $commit) {
540       # If $treeish isn't just a hash or abbrev hash, or isn't here
541       # yet, we need to fetch the remote to resolve it correctly.
542
543       # First, remove all local heads. This prevents a name that does
544       # not exist on the remote from resolving to (or colliding with)
545       # a previously fetched branch or tag (possibly from a different
546       # remote).
547       remove_tree("$local_repo/refs/heads", {keep_root => 1});
548
549       Log(undef, "Fetching objects from $repo to $local_repo");
550       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
551       if ($?) {
552         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
553       }
554     }
555
556     # Now that the data is all here, we will use our local repo for
557     # the rest of our git activities.
558     $repo = $local_repo;
559   }
560
561   my $gitcmd = "git --git-dir=\Q$repo\E";
562   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
563   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
564     croak("`$gitcmd rev-list` exited "
565           .exit_status_s($?)
566           .", '$treeish' not found. Giving up.");
567   }
568   $commit = $1;
569   Log(undef, "Version $treeish is commit $commit");
570
571   if ($commit ne $Job->{'script_version'}) {
572     # Record the real commit id in the database, frozentokey, logs,
573     # etc. -- instead of an abbreviation or a branch name which can
574     # become ambiguous or point to a different commit in the future.
575     if (!$Job->update_attributes('script_version' => $commit)) {
576       croak("Error: failed to update job's script_version attribute");
577     }
578   }
579
580   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
581   add_git_archive("$gitcmd archive ''\Q$commit\E");
582 }
583
584 my $git_archive = combined_git_archive();
585 if (!defined $git_archive) {
586   Log(undef, "Skip install phase (no git archive)");
587   if ($have_slurm) {
588     Log(undef, "Warning: This probably means workers have no source tree!");
589   }
590 }
591 else {
592   Log(undef, "Run install script on all workers");
593
594   my @srunargs = ("srun",
595                   "--nodelist=$nodelist",
596                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
597   my @execargs = ("sh", "-c",
598                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
599
600   $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
601   my ($install_stderr_r, $install_stderr_w);
602   pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
603   set_nonblocking($install_stderr_r);
604   my $installpid = fork();
605   if ($installpid == 0)
606   {
607     close($install_stderr_r);
608     fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
609     open(STDOUT, ">&", $install_stderr_w);
610     open(STDERR, ">&", $install_stderr_w);
611     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
612     exit (1);
613   }
614   close($install_stderr_w);
615   my $stderr_buf = '';
616   while ($installpid != waitpid(-1, WNOHANG)) {
617     freeze_if_want_freeze ($installpid);
618     # Wait up to 0.1 seconds for something to appear on stderr, then
619     # do a non-blocking read.
620     my $bits = fhbits($install_stderr_r);
621     select ($bits, undef, $bits, 0.1);
622     if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
623     {
624       while ($stderr_buf =~ /^(.*?)\n/) {
625         my $line = $1;
626         substr $stderr_buf, 0, 1+length($line), "";
627         Log(undef, "stderr $line");
628       }
629     }
630   }
631   my $install_exited = $?;
632   close($install_stderr_r);
633   if (length($stderr_buf) > 0) {
634     Log(undef, "stderr $stderr_buf")
635   }
636
637   Log (undef, "Install script exited ".exit_status_s($install_exited));
638   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
639     unlink($tar_filename);
640   }
641   exit (1) if $install_exited != 0;
642 }
643
644 foreach (qw (script script_version script_parameters runtime_constraints))
645 {
646   Log (undef,
647        "$_ " .
648        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
649 }
650 foreach (split (/\n/, $Job->{knobs}))
651 {
652   Log (undef, "knob " . $_);
653 }
654
655
656
657 $main::success = undef;
658
659
660
661 ONELEVEL:
662
663 my $thisround_succeeded = 0;
664 my $thisround_failed = 0;
665 my $thisround_failed_multiple = 0;
666
667 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
668                        or $a <=> $b } @jobstep_todo;
669 my $level = $jobstep[$jobstep_todo[0]]->{level};
670
671 my $initial_tasks_this_level = 0;
672 foreach my $id (@jobstep_todo) {
673   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
674 }
675
676 # If the number of tasks scheduled at this level #T is smaller than the number
677 # of slots available #S, only use the first #T slots, or the first slot on
678 # each node, whichever number is greater.
679 #
680 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
681 # based on these numbers.  Using fewer slots makes more resources available
682 # to each individual task, which should normally be a better strategy when
683 # there are fewer of them running with less parallelism.
684 #
685 # Note that this calculation is not redone if the initial tasks at
686 # this level queue more tasks at the same level.  This may harm
687 # overall task throughput for that level.
688 my @freeslot;
689 if ($initial_tasks_this_level < @node) {
690   @freeslot = (0..$#node);
691 } elsif ($initial_tasks_this_level < @slot) {
692   @freeslot = (0..$initial_tasks_this_level - 1);
693 } else {
694   @freeslot = (0..$#slot);
695 }
696 my $round_num_freeslots = scalar(@freeslot);
697
698 my %round_max_slots = ();
699 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
700   my $this_slot = $slot[$freeslot[$ii]];
701   my $node_name = $this_slot->{node}->{name};
702   $round_max_slots{$node_name} ||= $this_slot->{cpu};
703   last if (scalar(keys(%round_max_slots)) >= @node);
704 }
705
706 Log(undef, "start level $level with $round_num_freeslots slots");
707 my %proc;
708 my @holdslot;
709 my %reader;
710 my $progress_is_dirty = 1;
711 my $progress_stats_updated = 0;
712
713 update_progress_stats();
714
715
716 THISROUND:
717 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
718 {
719   my $id = $jobstep_todo[$todo_ptr];
720   my $Jobstep = $jobstep[$id];
721   if ($Jobstep->{level} != $level)
722   {
723     next;
724   }
725
726   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
727   set_nonblocking($reader{$id});
728
729   my $childslot = $freeslot[0];
730   my $childnode = $slot[$childslot]->{node};
731   my $childslotname = join (".",
732                             $slot[$childslot]->{node}->{name},
733                             $slot[$childslot]->{cpu});
734
735   my $childpid = fork();
736   if ($childpid == 0)
737   {
738     $SIG{'INT'} = 'DEFAULT';
739     $SIG{'QUIT'} = 'DEFAULT';
740     $SIG{'TERM'} = 'DEFAULT';
741
742     foreach (values (%reader))
743     {
744       close($_);
745     }
746     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
747     open(STDOUT,">&writer");
748     open(STDERR,">&writer");
749
750     undef $dbh;
751     undef $sth;
752
753     delete $ENV{"GNUPGHOME"};
754     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
755     $ENV{"TASK_QSEQUENCE"} = $id;
756     $ENV{"TASK_SEQUENCE"} = $level;
757     $ENV{"JOB_SCRIPT"} = $Job->{script};
758     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
759       $param =~ tr/a-z/A-Z/;
760       $ENV{"JOB_PARAMETER_$param"} = $value;
761     }
762     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
763     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
764     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
765     $ENV{"HOME"} = $ENV{"TASK_WORK"};
766     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
767     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
768     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
769     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
770
771     $ENV{"GZIP"} = "-n";
772
773     my @srunargs = (
774       "srun",
775       "--nodelist=".$childnode->{name},
776       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
777       "--job-name=$job_id.$id.$$",
778         );
779     my $command =
780         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
781         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
782         ."&& cd $ENV{CRUNCH_TMP} "
783         # These environment variables get used explicitly later in
784         # $command.  No tool is expected to read these values directly.
785         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
786         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
787         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
788         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
789     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
790     if ($docker_hash)
791     {
792       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
793       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
794       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
795       # We only set memory limits if Docker lets us limit both memory and swap.
796       # Memory limits alone have been supported longer, but subprocesses tend
797       # to get SIGKILL if they exceed that without any swap limit set.
798       # See #5642 for additional background.
799       if ($docker_limitmem) {
800         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
801       }
802
803       # Dynamically configure the container to use the host system as its
804       # DNS server.  Get the host's global addresses from the ip command,
805       # and turn them into docker --dns options using gawk.
806       $command .=
807           q{$(ip -o address show scope global |
808               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
809
810       # The source tree and $destdir directory (which we have
811       # installed on the worker host) are available in the container,
812       # under the same path.
813       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
814       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
815
816       # Currently, we make arv-mount's mount point appear at /keep
817       # inside the container (instead of using the same path as the
818       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
819       # crunch scripts and utilities must not rely on this. They must
820       # use $TASK_KEEPMOUNT.
821       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
822       $ENV{TASK_KEEPMOUNT} = "/keep";
823
824       # TASK_WORK is almost exactly like a docker data volume: it
825       # starts out empty, is writable, and persists until no
826       # containers use it any more. We don't use --volumes-from to
827       # share it with other containers: it is only accessible to this
828       # task, and it goes away when this task stops.
829       #
830       # However, a docker data volume is writable only by root unless
831       # the mount point already happens to exist in the container with
832       # different permissions. Therefore, we [1] assume /tmp already
833       # exists in the image and is writable by the crunch user; [2]
834       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
835       # writable if they are created by docker while setting up the
836       # other --volumes); and [3] create $TASK_WORK inside the
837       # container using $build_script.
838       $command .= "--volume=/tmp ";
839       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
840       $ENV{"HOME"} = $ENV{"TASK_WORK"};
841       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
842
843       # TODO: Share a single JOB_WORK volume across all task
844       # containers on a given worker node, and delete it when the job
845       # ends (and, in case that doesn't work, when the next job
846       # starts).
847       #
848       # For now, use the same approach as TASK_WORK above.
849       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
850
851       while (my ($env_key, $env_val) = each %ENV)
852       {
853         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
854           $command .= "--env=\Q$env_key=$env_val\E ";
855         }
856       }
857       $command .= "--env=\QHOME=$ENV{HOME}\E ";
858       $command .= "\Q$docker_hash\E ";
859       $command .= "stdbuf --output=0 --error=0 ";
860       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
861     } else {
862       # Non-docker run
863       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
864       $command .= "stdbuf --output=0 --error=0 ";
865       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
866     }
867
868     my @execargs = ('bash', '-c', $command);
869     srun (\@srunargs, \@execargs, undef, $build_script);
870     # exec() failed, we assume nothing happened.
871     die "srun() failed on build script\n";
872   }
873   close("writer");
874   if (!defined $childpid)
875   {
876     close $reader{$id};
877     delete $reader{$id};
878     next;
879   }
880   shift @freeslot;
881   $proc{$childpid} = { jobstep => $id,
882                        time => time,
883                        slot => $childslot,
884                        jobstepname => "$job_id.$id.$childpid",
885                      };
886   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
887   $slot[$childslot]->{pid} = $childpid;
888
889   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
890   Log ($id, "child $childpid started on $childslotname");
891   $Jobstep->{starttime} = time;
892   $Jobstep->{node} = $childnode->{name};
893   $Jobstep->{slotindex} = $childslot;
894   delete $Jobstep->{stderr};
895   delete $Jobstep->{finishtime};
896
897   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
898   $Jobstep->{'arvados_task'}->save;
899
900   splice @jobstep_todo, $todo_ptr, 1;
901   --$todo_ptr;
902
903   $progress_is_dirty = 1;
904
905   while (!@freeslot
906          ||
907          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
908   {
909     last THISROUND if $main::please_freeze || defined($main::success);
910     if ($main::please_info)
911     {
912       $main::please_info = 0;
913       freeze();
914       create_output_collection();
915       save_meta(1);
916       update_progress_stats();
917     }
918     my $gotsome
919         = readfrompipes ()
920         + reapchildren ();
921     if (!$gotsome)
922     {
923       check_refresh_wanted();
924       check_squeue();
925       update_progress_stats();
926       select (undef, undef, undef, 0.1);
927     }
928     elsif (time - $progress_stats_updated >= 30)
929     {
930       update_progress_stats();
931     }
932     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
933         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
934     {
935       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
936           .($thisround_failed+$thisround_succeeded)
937           .") -- giving up on this round";
938       Log (undef, $message);
939       last THISROUND;
940     }
941
942     # move slots from freeslot to holdslot (or back to freeslot) if necessary
943     for (my $i=$#freeslot; $i>=0; $i--) {
944       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
945         push @holdslot, (splice @freeslot, $i, 1);
946       }
947     }
948     for (my $i=$#holdslot; $i>=0; $i--) {
949       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
950         push @freeslot, (splice @holdslot, $i, 1);
951       }
952     }
953
954     # give up if no nodes are succeeding
955     if (!grep { $_->{node}->{losing_streak} == 0 &&
956                     $_->{node}->{hold_count} < 4 } @slot) {
957       my $message = "Every node has failed -- giving up on this round";
958       Log (undef, $message);
959       last THISROUND;
960     }
961   }
962 }
963
964
965 push @freeslot, splice @holdslot;
966 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
967
968
969 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
970 while (%proc)
971 {
972   if ($main::please_continue) {
973     $main::please_continue = 0;
974     goto THISROUND;
975   }
976   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
977   readfrompipes ();
978   if (!reapchildren())
979   {
980     check_refresh_wanted();
981     check_squeue();
982     update_progress_stats();
983     select (undef, undef, undef, 0.1);
984     killem (keys %proc) if $main::please_freeze;
985   }
986 }
987
988 update_progress_stats();
989 freeze_if_want_freeze();
990
991
992 if (!defined $main::success)
993 {
994   if (@jobstep_todo &&
995       $thisround_succeeded == 0 &&
996       ($thisround_failed == 0 || $thisround_failed > 4))
997   {
998     my $message = "stop because $thisround_failed tasks failed and none succeeded";
999     Log (undef, $message);
1000     $main::success = 0;
1001   }
1002   if (!@jobstep_todo)
1003   {
1004     $main::success = 1;
1005   }
1006 }
1007
1008 goto ONELEVEL if !defined $main::success;
1009
1010
1011 release_allocation();
1012 freeze();
1013 my $collated_output = &create_output_collection();
1014
1015 if (!$collated_output) {
1016   Log (undef, "Failed to write output collection");
1017 }
1018 else {
1019   Log(undef, "job output $collated_output");
1020   $Job->update_attributes('output' => $collated_output);
1021 }
1022
1023 Log (undef, "finish");
1024
1025 save_meta();
1026
1027 my $final_state;
1028 if ($collated_output && $main::success) {
1029   $final_state = 'Complete';
1030 } else {
1031   $final_state = 'Failed';
1032 }
1033 $Job->update_attributes('state' => $final_state);
1034
1035 exit (($final_state eq 'Complete') ? 0 : 1);
1036
1037
1038
1039 sub update_progress_stats
1040 {
1041   $progress_stats_updated = time;
1042   return if !$progress_is_dirty;
1043   my ($todo, $done, $running) = (scalar @jobstep_todo,
1044                                  scalar @jobstep_done,
1045                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1046   $Job->{'tasks_summary'} ||= {};
1047   $Job->{'tasks_summary'}->{'todo'} = $todo;
1048   $Job->{'tasks_summary'}->{'done'} = $done;
1049   $Job->{'tasks_summary'}->{'running'} = $running;
1050   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1051   Log (undef, "status: $done done, $running running, $todo todo");
1052   $progress_is_dirty = 0;
1053 }
1054
1055
1056
1057 sub reapchildren
1058 {
1059   my $pid = waitpid (-1, WNOHANG);
1060   return 0 if $pid <= 0;
1061
1062   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1063                   . "."
1064                   . $slot[$proc{$pid}->{slot}]->{cpu});
1065   my $jobstepid = $proc{$pid}->{jobstep};
1066   my $elapsed = time - $proc{$pid}->{time};
1067   my $Jobstep = $jobstep[$jobstepid];
1068
1069   my $childstatus = $?;
1070   my $exitvalue = $childstatus >> 8;
1071   my $exitinfo = "exit ".exit_status_s($childstatus);
1072   $Jobstep->{'arvados_task'}->reload;
1073   my $task_success = $Jobstep->{'arvados_task'}->{success};
1074
1075   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1076
1077   if (!defined $task_success) {
1078     # task did not indicate one way or the other --> fail
1079     $Jobstep->{'arvados_task'}->{success} = 0;
1080     $Jobstep->{'arvados_task'}->save;
1081     $task_success = 0;
1082   }
1083
1084   if (!$task_success)
1085   {
1086     my $temporary_fail;
1087     $temporary_fail ||= $Jobstep->{node_fail};
1088     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1089
1090     ++$thisround_failed;
1091     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1092
1093     # Check for signs of a failed or misconfigured node
1094     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1095         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1096       # Don't count this against jobstep failure thresholds if this
1097       # node is already suspected faulty and srun exited quickly
1098       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1099           $elapsed < 5) {
1100         Log ($jobstepid, "blaming failure on suspect node " .
1101              $slot[$proc{$pid}->{slot}]->{node}->{name});
1102         $temporary_fail ||= 1;
1103       }
1104       ban_node_by_slot($proc{$pid}->{slot});
1105     }
1106
1107     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1108                              ++$Jobstep->{'failures'},
1109                              $temporary_fail ? 'temporary ' : 'permanent',
1110                              $elapsed));
1111
1112     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1113       # Give up on this task, and the whole job
1114       $main::success = 0;
1115     }
1116     # Put this task back on the todo queue
1117     push @jobstep_todo, $jobstepid;
1118     $Job->{'tasks_summary'}->{'failed'}++;
1119   }
1120   else
1121   {
1122     ++$thisround_succeeded;
1123     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1124     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1125     push @jobstep_done, $jobstepid;
1126     Log ($jobstepid, "success in $elapsed seconds");
1127   }
1128   $Jobstep->{exitcode} = $childstatus;
1129   $Jobstep->{finishtime} = time;
1130   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1131   $Jobstep->{'arvados_task'}->save;
1132   process_stderr ($jobstepid, $task_success);
1133   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1134                            length($Jobstep->{'arvados_task'}->{output}),
1135                            $Jobstep->{'arvados_task'}->{output}));
1136
1137   close $reader{$jobstepid};
1138   delete $reader{$jobstepid};
1139   delete $slot[$proc{$pid}->{slot}]->{pid};
1140   push @freeslot, $proc{$pid}->{slot};
1141   delete $proc{$pid};
1142
1143   if ($task_success) {
1144     # Load new tasks
1145     my $newtask_list = [];
1146     my $newtask_results;
1147     do {
1148       $newtask_results = api_call(
1149         "job_tasks/list",
1150         'where' => {
1151           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1152         },
1153         'order' => 'qsequence',
1154         'offset' => scalar(@$newtask_list),
1155       );
1156       push(@$newtask_list, @{$newtask_results->{items}});
1157     } while (@{$newtask_results->{items}});
1158     foreach my $arvados_task (@$newtask_list) {
1159       my $jobstep = {
1160         'level' => $arvados_task->{'sequence'},
1161         'failures' => 0,
1162         'arvados_task' => $arvados_task
1163       };
1164       push @jobstep, $jobstep;
1165       push @jobstep_todo, $#jobstep;
1166     }
1167   }
1168
1169   $progress_is_dirty = 1;
1170   1;
1171 }
1172
1173 sub check_refresh_wanted
1174 {
1175   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1176   if (@stat && $stat[9] > $latest_refresh) {
1177     $latest_refresh = scalar time;
1178     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1179     for my $attr ('cancelled_at',
1180                   'cancelled_by_user_uuid',
1181                   'cancelled_by_client_uuid',
1182                   'state') {
1183       $Job->{$attr} = $Job2->{$attr};
1184     }
1185     if ($Job->{'state'} ne "Running") {
1186       if ($Job->{'state'} eq "Cancelled") {
1187         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1188       } else {
1189         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1190       }
1191       $main::success = 0;
1192       $main::please_freeze = 1;
1193     }
1194   }
1195 }
1196
1197 sub check_squeue
1198 {
1199   # return if the kill list was checked <4 seconds ago
1200   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1201   {
1202     return;
1203   }
1204   $squeue_kill_checked = time;
1205
1206   # use killem() on procs whose killtime is reached
1207   for (keys %proc)
1208   {
1209     if (exists $proc{$_}->{killtime}
1210         && $proc{$_}->{killtime} <= time)
1211     {
1212       killem ($_);
1213     }
1214   }
1215
1216   # return if the squeue was checked <60 seconds ago
1217   if (defined $squeue_checked && $squeue_checked > time - 60)
1218   {
1219     return;
1220   }
1221   $squeue_checked = time;
1222
1223   if (!$have_slurm)
1224   {
1225     # here is an opportunity to check for mysterious problems with local procs
1226     return;
1227   }
1228
1229   # get a list of steps still running
1230   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1231   chop @squeue;
1232   if ($squeue[-1] ne "ok")
1233   {
1234     return;
1235   }
1236   pop @squeue;
1237
1238   # which of my jobsteps are running, according to squeue?
1239   my %ok;
1240   foreach (@squeue)
1241   {
1242     if (/^(\d+)\.(\d+) (\S+)/)
1243     {
1244       if ($1 eq $ENV{SLURM_JOBID})
1245       {
1246         $ok{$3} = 1;
1247       }
1248     }
1249   }
1250
1251   # which of my active child procs (>60s old) were not mentioned by squeue?
1252   foreach (keys %proc)
1253   {
1254     if ($proc{$_}->{time} < time - 60
1255         && !exists $ok{$proc{$_}->{jobstepname}}
1256         && !exists $proc{$_}->{killtime})
1257     {
1258       # kill this proc if it hasn't exited in 30 seconds
1259       $proc{$_}->{killtime} = time + 30;
1260     }
1261   }
1262 }
1263
1264
1265 sub release_allocation
1266 {
1267   if ($have_slurm)
1268   {
1269     Log (undef, "release job allocation");
1270     system "scancel $ENV{SLURM_JOBID}";
1271   }
1272 }
1273
1274
1275 sub readfrompipes
1276 {
1277   my $gotsome = 0;
1278   foreach my $job (keys %reader)
1279   {
1280     my $buf;
1281     while (0 < sysread ($reader{$job}, $buf, 8192))
1282     {
1283       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1284       $jobstep[$job]->{stderr} .= $buf;
1285       preprocess_stderr ($job);
1286       if (length ($jobstep[$job]->{stderr}) > 16384)
1287       {
1288         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1289       }
1290       $gotsome = 1;
1291     }
1292   }
1293   return $gotsome;
1294 }
1295
1296
1297 sub preprocess_stderr
1298 {
1299   my $job = shift;
1300
1301   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1302     my $line = $1;
1303     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1304     Log ($job, "stderr $line");
1305     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1306       # whoa.
1307       $main::please_freeze = 1;
1308     }
1309     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1310       $jobstep[$job]->{node_fail} = 1;
1311       ban_node_by_slot($jobstep[$job]->{slotindex});
1312     }
1313   }
1314 }
1315
1316
1317 sub process_stderr
1318 {
1319   my $job = shift;
1320   my $task_success = shift;
1321   preprocess_stderr ($job);
1322
1323   map {
1324     Log ($job, "stderr $_");
1325   } split ("\n", $jobstep[$job]->{stderr});
1326 }
1327
1328 sub fetch_block
1329 {
1330   my $hash = shift;
1331   my $keep;
1332   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1333     Log(undef, "fetch_block run error from arv-get $hash: $!");
1334     return undef;
1335   }
1336   my $output_block = "";
1337   while (1) {
1338     my $buf;
1339     my $bytes = sysread($keep, $buf, 1024 * 1024);
1340     if (!defined $bytes) {
1341       Log(undef, "fetch_block read error from arv-get: $!");
1342       $output_block = undef;
1343       last;
1344     } elsif ($bytes == 0) {
1345       # sysread returns 0 at the end of the pipe.
1346       last;
1347     } else {
1348       # some bytes were read into buf.
1349       $output_block .= $buf;
1350     }
1351   }
1352   close $keep;
1353   if ($?) {
1354     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1355     $output_block = undef;
1356   }
1357   return $output_block;
1358 }
1359
1360 # Create a collection by concatenating the output of all tasks (each
1361 # task's output is either a manifest fragment, a locator for a
1362 # manifest fragment stored in Keep, or nothing at all). Return the
1363 # portable_data_hash of the new collection.
1364 sub create_output_collection
1365 {
1366   Log (undef, "collate");
1367
1368   my ($child_out, $child_in);
1369   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1370 import arvados
1371 import sys
1372 print (arvados.api("v1").collections().
1373        create(body={"manifest_text": sys.stdin.read()}).
1374        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1375 }, retry_count());
1376
1377   my $task_idx = -1;
1378   my $manifest_size = 0;
1379   for (@jobstep)
1380   {
1381     ++$task_idx;
1382     my $output = $_->{'arvados_task'}->{output};
1383     next if (!defined($output));
1384     my $next_write;
1385     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1386       $next_write = fetch_block($output);
1387     } else {
1388       $next_write = $output;
1389     }
1390     if (defined($next_write)) {
1391       if (!defined(syswrite($child_in, $next_write))) {
1392         # There's been an error writing.  Stop the loop.
1393         # We'll log details about the exit code later.
1394         last;
1395       } else {
1396         $manifest_size += length($next_write);
1397       }
1398     } else {
1399       my $uuid = $_->{'arvados_task'}->{'uuid'};
1400       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1401       $main::success = 0;
1402     }
1403   }
1404   close($child_in);
1405   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1406
1407   my $joboutput;
1408   my $s = IO::Select->new($child_out);
1409   if ($s->can_read(120)) {
1410     sysread($child_out, $joboutput, 1024 * 1024);
1411     waitpid($pid, 0);
1412     if ($?) {
1413       Log(undef, "output collection creation exited " . exit_status_s($?));
1414       $joboutput = undef;
1415     } else {
1416       chomp($joboutput);
1417     }
1418   } else {
1419     Log (undef, "timed out while creating output collection");
1420     foreach my $signal (2, 2, 2, 15, 15, 9) {
1421       kill($signal, $pid);
1422       last if waitpid($pid, WNOHANG) == -1;
1423       sleep(1);
1424     }
1425   }
1426   close($child_out);
1427
1428   return $joboutput;
1429 }
1430
1431
1432 sub killem
1433 {
1434   foreach (@_)
1435   {
1436     my $sig = 2;                # SIGINT first
1437     if (exists $proc{$_}->{"sent_$sig"} &&
1438         time - $proc{$_}->{"sent_$sig"} > 4)
1439     {
1440       $sig = 15;                # SIGTERM if SIGINT doesn't work
1441     }
1442     if (exists $proc{$_}->{"sent_$sig"} &&
1443         time - $proc{$_}->{"sent_$sig"} > 4)
1444     {
1445       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1446     }
1447     if (!exists $proc{$_}->{"sent_$sig"})
1448     {
1449       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1450       kill $sig, $_;
1451       select (undef, undef, undef, 0.1);
1452       if ($sig == 2)
1453       {
1454         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1455       }
1456       $proc{$_}->{"sent_$sig"} = time;
1457       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1458     }
1459   }
1460 }
1461
1462
1463 sub fhbits
1464 {
1465   my($bits);
1466   for (@_) {
1467     vec($bits,fileno($_),1) = 1;
1468   }
1469   $bits;
1470 }
1471
1472
1473 # Send log output to Keep via arv-put.
1474 #
1475 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1476 # $log_pipe_pid is the pid of the arv-put subprocess.
1477 #
1478 # The only functions that should access these variables directly are:
1479 #
1480 # log_writer_start($logfilename)
1481 #     Starts an arv-put pipe, reading data on stdin and writing it to
1482 #     a $logfilename file in an output collection.
1483 #
1484 # log_writer_send($txt)
1485 #     Writes $txt to the output log collection.
1486 #
1487 # log_writer_finish()
1488 #     Closes the arv-put pipe and returns the output that it produces.
1489 #
1490 # log_writer_is_active()
1491 #     Returns a true value if there is currently a live arv-put
1492 #     process, false otherwise.
1493 #
1494 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1495
1496 sub log_writer_start($)
1497 {
1498   my $logfilename = shift;
1499   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1500                         'arv-put',
1501                         '--portable-data-hash',
1502                         '--project-uuid', $Job->{owner_uuid},
1503                         '--retries', '3',
1504                         '--name', $logfilename,
1505                         '--filename', $logfilename,
1506                         '-');
1507 }
1508
1509 sub log_writer_send($)
1510 {
1511   my $txt = shift;
1512   print $log_pipe_in $txt;
1513 }
1514
1515 sub log_writer_finish()
1516 {
1517   return unless $log_pipe_pid;
1518
1519   close($log_pipe_in);
1520   my $arv_put_output;
1521
1522   my $s = IO::Select->new($log_pipe_out);
1523   if ($s->can_read(120)) {
1524     sysread($log_pipe_out, $arv_put_output, 1024);
1525     chomp($arv_put_output);
1526   } else {
1527     Log (undef, "timed out reading from 'arv-put'");
1528   }
1529
1530   waitpid($log_pipe_pid, 0);
1531   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1532   if ($?) {
1533     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1534   }
1535
1536   return $arv_put_output;
1537 }
1538
1539 sub log_writer_is_active() {
1540   return $log_pipe_pid;
1541 }
1542
1543 sub Log                         # ($jobstep_id, $logmessage)
1544 {
1545   if ($_[1] =~ /\n/) {
1546     for my $line (split (/\n/, $_[1])) {
1547       Log ($_[0], $line);
1548     }
1549     return;
1550   }
1551   my $fh = select STDERR; $|=1; select $fh;
1552   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1553   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1554   $message .= "\n";
1555   my $datetime;
1556   if (log_writer_is_active() || -t STDERR) {
1557     my @gmtime = gmtime;
1558     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1559                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1560   }
1561   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1562
1563   if (log_writer_is_active()) {
1564     log_writer_send($datetime . " " . $message);
1565   }
1566 }
1567
1568
1569 sub croak
1570 {
1571   my ($package, $file, $line) = caller;
1572   my $message = "@_ at $file line $line\n";
1573   Log (undef, $message);
1574   freeze() if @jobstep_todo;
1575   create_output_collection() if @jobstep_todo;
1576   cleanup();
1577   save_meta();
1578   die;
1579 }
1580
1581
1582 sub cleanup
1583 {
1584   return unless $Job;
1585   if ($Job->{'state'} eq 'Cancelled') {
1586     $Job->update_attributes('finished_at' => scalar gmtime);
1587   } else {
1588     $Job->update_attributes('state' => 'Failed');
1589   }
1590 }
1591
1592
1593 sub save_meta
1594 {
1595   my $justcheckpoint = shift; # false if this will be the last meta saved
1596   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1597   return unless log_writer_is_active();
1598
1599   my $loglocator = log_writer_finish();
1600   Log (undef, "log manifest is $loglocator");
1601   $Job->{'log'} = $loglocator;
1602   $Job->update_attributes('log', $loglocator);
1603 }
1604
1605
1606 sub freeze_if_want_freeze
1607 {
1608   if ($main::please_freeze)
1609   {
1610     release_allocation();
1611     if (@_)
1612     {
1613       # kill some srun procs before freeze+stop
1614       map { $proc{$_} = {} } @_;
1615       while (%proc)
1616       {
1617         killem (keys %proc);
1618         select (undef, undef, undef, 0.1);
1619         my $died;
1620         while (($died = waitpid (-1, WNOHANG)) > 0)
1621         {
1622           delete $proc{$died};
1623         }
1624       }
1625     }
1626     freeze();
1627     create_output_collection();
1628     cleanup();
1629     save_meta();
1630     exit 1;
1631   }
1632 }
1633
1634
1635 sub freeze
1636 {
1637   Log (undef, "Freeze not implemented");
1638   return;
1639 }
1640
1641
1642 sub thaw
1643 {
1644   croak ("Thaw not implemented");
1645 }
1646
1647
1648 sub freezequote
1649 {
1650   my $s = shift;
1651   $s =~ s/\\/\\\\/g;
1652   $s =~ s/\n/\\n/g;
1653   return $s;
1654 }
1655
1656
1657 sub freezeunquote
1658 {
1659   my $s = shift;
1660   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1661   return $s;
1662 }
1663
1664
1665 sub srun
1666 {
1667   my $srunargs = shift;
1668   my $execargs = shift;
1669   my $opts = shift || {};
1670   my $stdin = shift;
1671   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1672
1673   $Data::Dumper::Terse = 1;
1674   $Data::Dumper::Indent = 0;
1675   my $show_cmd = Dumper($args);
1676   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1677   $show_cmd =~ s/\n/ /g;
1678   warn "starting: $show_cmd\n";
1679
1680   if (defined $stdin) {
1681     my $child = open STDIN, "-|";
1682     defined $child or die "no fork: $!";
1683     if ($child == 0) {
1684       print $stdin or die $!;
1685       close STDOUT or die $!;
1686       exit 0;
1687     }
1688   }
1689
1690   return system (@$args) if $opts->{fork};
1691
1692   exec @$args;
1693   warn "ENV size is ".length(join(" ",%ENV));
1694   die "exec failed: $!: @$args";
1695 }
1696
1697
1698 sub ban_node_by_slot {
1699   # Don't start any new jobsteps on this node for 60 seconds
1700   my $slotid = shift;
1701   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1702   $slot[$slotid]->{node}->{hold_count}++;
1703   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1704 }
1705
1706 sub must_lock_now
1707 {
1708   my ($lockfile, $error_message) = @_;
1709   open L, ">", $lockfile or croak("$lockfile: $!");
1710   if (!flock L, LOCK_EX|LOCK_NB) {
1711     croak("Can't lock $lockfile: $error_message\n");
1712   }
1713 }
1714
1715 sub find_docker_image {
1716   # Given a Keep locator, check to see if it contains a Docker image.
1717   # If so, return its stream name and Docker hash.
1718   # If not, return undef for both values.
1719   my $locator = shift;
1720   my ($streamname, $filename);
1721   my $image = api_call("collections/get", uuid => $locator);
1722   if ($image) {
1723     foreach my $line (split(/\n/, $image->{manifest_text})) {
1724       my @tokens = split(/\s+/, $line);
1725       next if (!@tokens);
1726       $streamname = shift(@tokens);
1727       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1728         if (defined($filename)) {
1729           return (undef, undef);  # More than one file in the Collection.
1730         } else {
1731           $filename = (split(/:/, $filedata, 3))[2];
1732         }
1733       }
1734     }
1735   }
1736   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1737     return ($streamname, $1);
1738   } else {
1739     return (undef, undef);
1740   }
1741 }
1742
1743 sub retry_count {
1744   # Calculate the number of times an operation should be retried,
1745   # assuming exponential backoff, and that we're willing to retry as
1746   # long as tasks have been running.  Enforce a minimum of 3 retries.
1747   my ($starttime, $endtime, $timediff, $retries);
1748   if (@jobstep) {
1749     $starttime = $jobstep[0]->{starttime};
1750     $endtime = $jobstep[-1]->{finishtime};
1751   }
1752   if (!defined($starttime)) {
1753     $timediff = 0;
1754   } elsif (!defined($endtime)) {
1755     $timediff = time - $starttime;
1756   } else {
1757     $timediff = ($endtime - $starttime) - (time - $endtime);
1758   }
1759   if ($timediff > 0) {
1760     $retries = int(log($timediff) / log(2));
1761   } else {
1762     $retries = 1;  # Use the minimum.
1763   }
1764   return ($retries > 3) ? $retries : 3;
1765 }
1766
1767 sub retry_op {
1768   # Pass in two function references.
1769   # This method will be called with the remaining arguments.
1770   # If it dies, retry it with exponential backoff until it succeeds,
1771   # or until the current retry_count is exhausted.  After each failure
1772   # that can be retried, the second function will be called with
1773   # the current try count (0-based), next try time, and error message.
1774   my $operation = shift;
1775   my $retry_callback = shift;
1776   my $retries = retry_count();
1777   foreach my $try_count (0..$retries) {
1778     my $next_try = time + (2 ** $try_count);
1779     my $result = eval { $operation->(@_); };
1780     if (!$@) {
1781       return $result;
1782     } elsif ($try_count < $retries) {
1783       $retry_callback->($try_count, $next_try, $@);
1784       my $sleep_time = $next_try - time;
1785       sleep($sleep_time) if ($sleep_time > 0);
1786     }
1787   }
1788   # Ensure the error message ends in a newline, so Perl doesn't add
1789   # retry_op's line number to it.
1790   chomp($@);
1791   die($@ . "\n");
1792 }
1793
1794 sub api_call {
1795   # Pass in a /-separated API method name, and arguments for it.
1796   # This function will call that method, retrying as needed until
1797   # the current retry_count is exhausted, with a log on the first failure.
1798   my $method_name = shift;
1799   my $log_api_retry = sub {
1800     my ($try_count, $next_try_at, $errmsg) = @_;
1801     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1802     $errmsg =~ s/\s/ /g;
1803     $errmsg =~ s/\s+$//;
1804     my $retry_msg;
1805     if ($next_try_at < time) {
1806       $retry_msg = "Retrying.";
1807     } else {
1808       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1809       $retry_msg = "Retrying at $next_try_fmt.";
1810     }
1811     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1812   };
1813   my $method = $arv;
1814   foreach my $key (split(/\//, $method_name)) {
1815     $method = $method->{$key};
1816   }
1817   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1818 }
1819
1820 sub exit_status_s {
1821   # Given a $?, return a human-readable exit code string like "0" or
1822   # "1" or "0 with signal 1" or "1 with signal 11".
1823   my $exitcode = shift;
1824   my $s = $exitcode >> 8;
1825   if ($exitcode & 0x7f) {
1826     $s .= " with signal " . ($exitcode & 0x7f);
1827   }
1828   if ($exitcode & 0x80) {
1829     $s .= " with core dump";
1830   }
1831   return $s;
1832 }
1833
1834 sub handle_readall {
1835   # Pass in a glob reference to a file handle.
1836   # Read all its contents and return them as a string.
1837   my $fh_glob_ref = shift;
1838   local $/ = undef;
1839   return <$fh_glob_ref>;
1840 }
1841
1842 sub tar_filename_n {
1843   my $n = shift;
1844   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1845 }
1846
1847 sub add_git_archive {
1848   # Pass in a git archive command as a string or list, a la system().
1849   # This method will save its output to be included in the archive sent to the
1850   # build script.
1851   my $git_input;
1852   $git_tar_count++;
1853   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1854     croak("Failed to save git archive: $!");
1855   }
1856   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1857   close($git_input);
1858   waitpid($git_pid, 0);
1859   close(GIT_ARCHIVE);
1860   if ($?) {
1861     croak("Failed to save git archive: git exited " . exit_status_s($?));
1862   }
1863 }
1864
1865 sub combined_git_archive {
1866   # Combine all saved tar archives into a single archive, then return its
1867   # contents in a string.  Return undef if no archives have been saved.
1868   if ($git_tar_count < 1) {
1869     return undef;
1870   }
1871   my $base_tar_name = tar_filename_n(1);
1872   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1873     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1874     if ($tar_exit != 0) {
1875       croak("Error preparing build archive: tar -A exited " .
1876             exit_status_s($tar_exit));
1877     }
1878   }
1879   if (!open(GIT_TAR, "<", $base_tar_name)) {
1880     croak("Could not open build archive: $!");
1881   }
1882   my $tar_contents = handle_readall(\*GIT_TAR);
1883   close(GIT_TAR);
1884   return $tar_contents;
1885 }
1886
1887 sub set_nonblocking {
1888   my $fh = shift;
1889   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1890   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1891 }
1892
1893 __DATA__
1894 #!/usr/bin/perl
1895 #
1896 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1897 # server invokes this script on individual compute nodes, or localhost if we're
1898 # running a job locally.  It gets called in two modes:
1899 #
1900 # * No arguments: Installation mode.  Read a tar archive from the DATA
1901 #   file handle; it includes the Crunch script's source code, and
1902 #   maybe SDKs as well.  Those should be installed in the proper
1903 #   locations.  This runs outside of any Docker container, so don't try to
1904 #   introspect Crunch's runtime environment.
1905 #
1906 # * With arguments: Crunch script run mode.  This script should set up the
1907 #   environment, then run the command specified in the arguments.  This runs
1908 #   inside any Docker container.
1909
1910 use Fcntl ':flock';
1911 use File::Path qw( make_path remove_tree );
1912 use POSIX qw(getcwd);
1913
1914 use constant TASK_TEMPFAIL => 111;
1915
1916 # Map SDK subdirectories to the path environments they belong to.
1917 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1918
1919 my $destdir = $ENV{"CRUNCH_SRC"};
1920 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
1921 my $repo = $ENV{"CRUNCH_SRC_URL"};
1922 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1923 my $job_work = $ENV{"JOB_WORK"};
1924 my $task_work = $ENV{"TASK_WORK"};
1925
1926 open(STDOUT_ORIG, ">&", STDOUT);
1927 open(STDERR_ORIG, ">&", STDERR);
1928
1929 for my $dir ($destdir, $job_work, $task_work) {
1930   if ($dir) {
1931     make_path $dir;
1932     -e $dir or die "Failed to create temporary directory ($dir): $!";
1933   }
1934 }
1935
1936 if ($task_work) {
1937   remove_tree($task_work, {keep_root => 1});
1938 }
1939
1940 ### Crunch script run mode
1941 if (@ARGV) {
1942   # We want to do routine logging during task 0 only.  This gives the user
1943   # the information they need, but avoids repeating the information for every
1944   # task.
1945   my $Log;
1946   if ($ENV{TASK_SEQUENCE} eq "0") {
1947     $Log = sub {
1948       my $msg = shift;
1949       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1950     };
1951   } else {
1952     $Log = sub { };
1953   }
1954
1955   my $python_src = "$install_dir/python";
1956   my $venv_dir = "$job_work/.arvados.venv";
1957   my $venv_built = -e "$venv_dir/bin/activate";
1958   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1959     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1960                  "--python=python2.7", $venv_dir);
1961     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1962     $venv_built = 1;
1963     $Log->("Built Python SDK virtualenv");
1964   }
1965
1966   my $pip_bin = "pip";
1967   if ($venv_built) {
1968     $Log->("Running in Python SDK virtualenv");
1969     $pip_bin = "$venv_dir/bin/pip";
1970     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1971     @ARGV = ("/bin/sh", "-ec",
1972              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1973   } elsif (-d $python_src) {
1974     $Log->("Warning: virtualenv not found inside Docker container default " .
1975            "\$PATH. Can't install Python SDK.");
1976   }
1977
1978   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1979   if ($pkgs) {
1980     $Log->("Using Arvados SDK:");
1981     foreach my $line (split /\n/, $pkgs) {
1982       $Log->($line);
1983     }
1984   } else {
1985     $Log->("Arvados SDK packages not found");
1986   }
1987
1988   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1989     my $sdk_path = "$install_dir/$sdk_dir";
1990     if (-d $sdk_path) {
1991       if ($ENV{$sdk_envkey}) {
1992         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1993       } else {
1994         $ENV{$sdk_envkey} = $sdk_path;
1995       }
1996       $Log->("Arvados SDK added to %s", $sdk_envkey);
1997     }
1998   }
1999
2000   exec(@ARGV);
2001   die "Cannot exec `@ARGV`: $!";
2002 }
2003
2004 ### Installation mode
2005 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2006 flock L, LOCK_EX;
2007 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2008   # This exact git archive (source + arvados sdk) is already installed
2009   # here, so there's no need to reinstall it.
2010
2011   # We must consume our DATA section, though: otherwise the process
2012   # feeding it to us will get SIGPIPE.
2013   my $buf;
2014   while (read(DATA, $buf, 65536)) { }
2015
2016   exit(0);
2017 }
2018
2019 unlink "$destdir.archive_hash";
2020 mkdir $destdir;
2021
2022 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
2023   die "Error launching 'tar -xC $destdir': $!";
2024 }
2025 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2026 # get SIGPIPE.  We must feed it data incrementally.
2027 my $tar_input;
2028 while (read(DATA, $tar_input, 65536)) {
2029   print TARX $tar_input;
2030 }
2031 if(!close(TARX)) {
2032   die "'tar -xC $destdir' exited $?: $!";
2033 }
2034
2035 mkdir $install_dir;
2036
2037 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2038 if (-d $sdk_root) {
2039   foreach my $sdk_lang (("python",
2040                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2041     if (-d "$sdk_root/$sdk_lang") {
2042       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2043         die "Failed to install $sdk_lang SDK: $!";
2044       }
2045     }
2046   }
2047 }
2048
2049 my $python_dir = "$install_dir/python";
2050 if ((-d $python_dir) and can_run("python2.7") and
2051     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2052   # egg_info failed, probably when it asked git for a build tag.
2053   # Specify no build tag.
2054   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2055   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2056   close($pysdk_cfg);
2057 }
2058
2059 # Hide messages from the install script (unless it fails: shell_or_die
2060 # will show $destdir.log in that case).
2061 open(STDOUT, ">>", "$destdir.log");
2062 open(STDERR, ">&", STDOUT);
2063
2064 if (-e "$destdir/crunch_scripts/install") {
2065     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2066 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2067     # Old version
2068     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2069 } elsif (-e "./install.sh") {
2070     shell_or_die (undef, "./install.sh", $install_dir);
2071 }
2072
2073 if ($archive_hash) {
2074     unlink "$destdir.archive_hash.new";
2075     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2076     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2077 }
2078
2079 close L;
2080
2081 sub can_run {
2082   my $command_name = shift;
2083   open(my $which, "-|", "which", $command_name);
2084   while (<$which>) { }
2085   close($which);
2086   return ($? == 0);
2087 }
2088
2089 sub shell_or_die
2090 {
2091   my $exitcode = shift;
2092
2093   if ($ENV{"DEBUG"}) {
2094     print STDERR "@_\n";
2095   }
2096   if (system (@_) != 0) {
2097     my $err = $!;
2098     my $code = $?;
2099     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2100     open STDERR, ">&STDERR_ORIG";
2101     system ("cat $destdir.log >&2");
2102     warn "@_ failed ($err): $exitstatus";
2103     if (defined($exitcode)) {
2104       exit $exitcode;
2105     }
2106     else {
2107       exit (($code >> 8) || 1);
2108     }
2109   }
2110 }
2111
2112 __DATA__