5717: crunch-job uses fewer slots when few tasks at this level.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my $force_unlock;
122 my $git_dir;
123 my $jobspec;
124 my $job_api_token;
125 my $no_clear_tmp;
126 my $resume_stash;
127 GetOptions('force-unlock' => \$force_unlock,
128            'git-dir=s' => \$git_dir,
129            'job=s' => \$jobspec,
130            'job-api-token=s' => \$job_api_token,
131            'no-clear-tmp' => \$no_clear_tmp,
132            'resume-stash=s' => \$resume_stash,
133     );
134
135 if (defined $job_api_token) {
136   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
137 }
138
139 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
140 my $local_job = 0;
141
142
143 $SIG{'USR1'} = sub
144 {
145   $main::ENV{CRUNCH_DEBUG} = 1;
146 };
147 $SIG{'USR2'} = sub
148 {
149   $main::ENV{CRUNCH_DEBUG} = 0;
150 };
151
152
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $User = api_call("users/current");
163
164 if ($jobspec =~ /^[-a-z\d]+$/)
165 {
166   # $jobspec is an Arvados UUID, not a JSON job specification
167   $Job = api_call("jobs/get", uuid => $jobspec);
168   if (!$force_unlock) {
169     # Claim this job, and make sure nobody else does
170     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
171     if ($@) {
172       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
173       exit EX_TEMPFAIL;
174     };
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189   $Job->{'state'} = 'Running';
190
191   $Job = api_call("jobs/create", job => $Job);
192 }
193 $job_id = $Job->{'uuid'};
194
195 my $keep_logfile = $job_id . '.log.txt';
196 log_writer_start($keep_logfile);
197
198 $Job->{'runtime_constraints'} ||= {};
199 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
200 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
201
202 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
203 if ($? == 0) {
204   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
205   chomp($gem_versions);
206   chop($gem_versions);  # Closing parentheses
207 } else {
208   $gem_versions = "";
209 }
210 Log(undef,
211     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
212
213 Log (undef, "check slurm allocation");
214 my @slot;
215 my @node;
216 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
217 my @sinfo;
218 if (!$have_slurm)
219 {
220   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
221   push @sinfo, "$localcpus localhost";
222 }
223 if (exists $ENV{SLURM_NODELIST})
224 {
225   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
226 }
227 foreach (@sinfo)
228 {
229   my ($ncpus, $slurm_nodelist) = split;
230   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
231
232   my @nodelist;
233   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
234   {
235     my $nodelist = $1;
236     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
237     {
238       my $ranges = $1;
239       foreach (split (",", $ranges))
240       {
241         my ($a, $b);
242         if (/(\d+)-(\d+)/)
243         {
244           $a = $1;
245           $b = $2;
246         }
247         else
248         {
249           $a = $_;
250           $b = $_;
251         }
252         push @nodelist, map {
253           my $n = $nodelist;
254           $n =~ s/\[[-,\d]+\]/$_/;
255           $n;
256         } ($a..$b);
257       }
258     }
259     else
260     {
261       push @nodelist, $nodelist;
262     }
263   }
264   foreach my $nodename (@nodelist)
265   {
266     Log (undef, "node $nodename - $ncpus slots");
267     my $node = { name => $nodename,
268                  ncpus => $ncpus,
269                  losing_streak => 0,
270                  hold_until => 0 };
271     foreach my $cpu (1..$ncpus)
272     {
273       push @slot, { node => $node,
274                     cpu => $cpu };
275     }
276   }
277   push @node, @nodelist;
278 }
279
280
281
282 # Ensure that we get one jobstep running on each allocated node before
283 # we start overloading nodes with concurrent steps
284
285 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
286
287
288 $Job->update_attributes(
289   'tasks_summary' => { 'failed' => 0,
290                        'todo' => 1,
291                        'running' => 0,
292                        'done' => 0 });
293
294 Log (undef, "start");
295 $SIG{'INT'} = sub { $main::please_freeze = 1; };
296 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
297 $SIG{'TERM'} = \&croak;
298 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
299 $SIG{'ALRM'} = sub { $main::please_info = 1; };
300 $SIG{'CONT'} = sub { $main::please_continue = 1; };
301 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
302
303 $main::please_freeze = 0;
304 $main::please_info = 0;
305 $main::please_continue = 0;
306 $main::please_refresh = 0;
307 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
308
309 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
310 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
311 $ENV{"JOB_UUID"} = $job_id;
312
313
314 my @jobstep_todo = ();
315 my @jobstep_done = ();
316 my @jobstep_tomerge = ();
317 my $jobstep_tomerge_level = 0;
318 my $squeue_checked;
319 my $squeue_kill_checked;
320 my $latest_refresh = scalar time;
321
322
323
324 if (defined $Job->{thawedfromkey})
325 {
326   thaw ($Job->{thawedfromkey});
327 }
328 else
329 {
330   my $first_task = api_call("job_tasks/create", job_task => {
331     'job_uuid' => $Job->{'uuid'},
332     'sequence' => 0,
333     'qsequence' => 0,
334     'parameters' => {},
335   });
336   push @jobstep, { 'level' => 0,
337                    'failures' => 0,
338                    'arvados_task' => $first_task,
339                  };
340   push @jobstep_todo, 0;
341 }
342
343
344 if (!$have_slurm)
345 {
346   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
347 }
348
349 my $build_script = handle_readall(\*DATA);
350 my $nodelist = join(",", @node);
351 my $git_tar_count = 0;
352
353 if (!defined $no_clear_tmp) {
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355   Log (undef, "Clean work dirs");
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     # Find FUSE mounts that look like Keep mounts (the mount path has the
361     # word "keep") and unmount them.  Then clean up work directories.
362     # TODO: When #5036 is done and widely deployed, we can get rid of the
363     # regular expression and just unmount everything with type fuse.keep.
364     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
365           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
366     exit (1);
367   }
368   while (1)
369   {
370     last if $cleanpid == waitpid (-1, WNOHANG);
371     freeze_if_want_freeze ($cleanpid);
372     select (undef, undef, undef, 0.1);
373   }
374   Log (undef, "Cleanup command exited ".exit_status_s($?));
375 }
376
377 # If this job requires a Docker image, install that.
378 my $docker_bin = "/usr/bin/docker.io";
379 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
380 if ($docker_locator = $Job->{docker_image_locator}) {
381   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
382   if (!$docker_hash)
383   {
384     croak("No Docker image hash found from locator $docker_locator");
385   }
386   $docker_stream =~ s/^\.//;
387   my $docker_install_script = qq{
388 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
389     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
390 fi
391 };
392   my $docker_pid = fork();
393   if ($docker_pid == 0)
394   {
395     srun (["srun", "--nodelist=" . join(',', @node)],
396           ["/bin/sh", "-ec", $docker_install_script]);
397     exit ($?);
398   }
399   while (1)
400   {
401     last if $docker_pid == waitpid (-1, WNOHANG);
402     freeze_if_want_freeze ($docker_pid);
403     select (undef, undef, undef, 0.1);
404   }
405   if ($? != 0)
406   {
407     croak("Installing Docker image from $docker_locator exited "
408           .exit_status_s($?));
409   }
410
411   # Determine whether this version of Docker supports memory+swap limits.
412   srun(["srun", "--nodelist=" . $node[0]],
413        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
414       {fork => 1});
415   $docker_limitmem = ($? == 0);
416
417   if ($Job->{arvados_sdk_version}) {
418     # The job also specifies an Arvados SDK version.  Add the SDKs to the
419     # tar file for the build script to install.
420     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
421                        $Job->{arvados_sdk_version}));
422     add_git_archive("git", "--git-dir=$git_dir", "archive",
423                     "--prefix=.arvados.sdk/",
424                     $Job->{arvados_sdk_version}, "sdk");
425   }
426 }
427
428 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
429   # If script_version looks like an absolute path, *and* the --git-dir
430   # argument was not given -- which implies we were not invoked by
431   # crunch-dispatch -- we will use the given path as a working
432   # directory instead of resolving script_version to a git commit (or
433   # doing anything else with git).
434   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
435   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
436 }
437 else {
438   # Resolve the given script_version to a git commit sha1. Also, if
439   # the repository is remote, clone it into our local filesystem: this
440   # ensures "git archive" will work, and is necessary to reliably
441   # resolve a symbolic script_version like "master^".
442   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
443
444   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
445
446   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
447
448   # If we're running under crunch-dispatch, it will have already
449   # pulled the appropriate source tree into its own repository, and
450   # given us that repo's path as $git_dir.
451   #
452   # If we're running a "local" job, we might have to fetch content
453   # from a remote repository.
454   #
455   # (Currently crunch-dispatch gives a local path with --git-dir, but
456   # we might as well accept URLs there too in case it changes its
457   # mind.)
458   my $repo = $git_dir || $Job->{'repository'};
459
460   # Repository can be remote or local. If remote, we'll need to fetch it
461   # to a local dir before doing `git log` et al.
462   my $repo_location;
463
464   if ($repo =~ m{://|^[^/]*:}) {
465     # $repo is a git url we can clone, like git:// or https:// or
466     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
467     # not recognized here because distinguishing that from a local
468     # path is too fragile. If you really need something strange here,
469     # use the ssh:// form.
470     $repo_location = 'remote';
471   } elsif ($repo =~ m{^\.*/}) {
472     # $repo is a local path to a git index. We'll also resolve ../foo
473     # to ../foo/.git if the latter is a directory. To help
474     # disambiguate local paths from named hosted repositories, this
475     # form must be given as ./ or ../ if it's a relative path.
476     if (-d "$repo/.git") {
477       $repo = "$repo/.git";
478     }
479     $repo_location = 'local';
480   } else {
481     # $repo is none of the above. It must be the name of a hosted
482     # repository.
483     my $arv_repo_list = api_call("repositories/list",
484                                  'filters' => [['name','=',$repo]]);
485     my @repos_found = @{$arv_repo_list->{'items'}};
486     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
487     if ($n_found > 0) {
488       Log(undef, "Repository '$repo' -> "
489           . join(", ", map { $_->{'uuid'} } @repos_found));
490     }
491     if ($n_found != 1) {
492       croak("Error: Found $n_found repositories with name '$repo'.");
493     }
494     $repo = $repos_found[0]->{'fetch_url'};
495     $repo_location = 'remote';
496   }
497   Log(undef, "Using $repo_location repository '$repo'");
498   $ENV{"CRUNCH_SRC_URL"} = $repo;
499
500   # Resolve given script_version (we'll call that $treeish here) to a
501   # commit sha1 ($commit).
502   my $treeish = $Job->{'script_version'};
503   my $commit;
504   if ($repo_location eq 'remote') {
505     # We minimize excess object-fetching by re-using the same bare
506     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
507     # just keep adding remotes to it as needed.
508     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
509     my $gitcmd = "git --git-dir=\Q$local_repo\E";
510
511     # Set up our local repo for caching remote objects, making
512     # archives, etc.
513     if (!-d $local_repo) {
514       make_path($local_repo) or croak("Error: could not create $local_repo");
515     }
516     # This works (exits 0 and doesn't delete fetched objects) even
517     # if $local_repo is already initialized:
518     `$gitcmd init --bare`;
519     if ($?) {
520       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
521     }
522
523     # If $treeish looks like a hash (or abbrev hash) we look it up in
524     # our local cache first, since that's cheaper. (We don't want to
525     # do that with tags/branches though -- those change over time, so
526     # they should always be resolved by the remote repo.)
527     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
528       # Hide stderr because it's normal for this to fail:
529       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
530       if ($? == 0 &&
531           # Careful not to resolve a branch named abcdeff to commit 1234567:
532           $sha1 =~ /^$treeish/ &&
533           $sha1 =~ /^([0-9a-f]{40})$/s) {
534         $commit = $1;
535         Log(undef, "Commit $commit already present in $local_repo");
536       }
537     }
538
539     if (!defined $commit) {
540       # If $treeish isn't just a hash or abbrev hash, or isn't here
541       # yet, we need to fetch the remote to resolve it correctly.
542
543       # First, remove all local heads. This prevents a name that does
544       # not exist on the remote from resolving to (or colliding with)
545       # a previously fetched branch or tag (possibly from a different
546       # remote).
547       remove_tree("$local_repo/refs/heads", {keep_root => 1});
548
549       Log(undef, "Fetching objects from $repo to $local_repo");
550       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
551       if ($?) {
552         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
553       }
554     }
555
556     # Now that the data is all here, we will use our local repo for
557     # the rest of our git activities.
558     $repo = $local_repo;
559   }
560
561   my $gitcmd = "git --git-dir=\Q$repo\E";
562   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
563   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
564     croak("`$gitcmd rev-list` exited "
565           .exit_status_s($?)
566           .", '$treeish' not found. Giving up.");
567   }
568   $commit = $1;
569   Log(undef, "Version $treeish is commit $commit");
570
571   if ($commit ne $Job->{'script_version'}) {
572     # Record the real commit id in the database, frozentokey, logs,
573     # etc. -- instead of an abbreviation or a branch name which can
574     # become ambiguous or point to a different commit in the future.
575     if (!$Job->update_attributes('script_version' => $commit)) {
576       croak("Error: failed to update job's script_version attribute");
577     }
578   }
579
580   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
581   add_git_archive("$gitcmd archive ''\Q$commit\E");
582 }
583
584 my $git_archive = combined_git_archive();
585 if (!defined $git_archive) {
586   Log(undef, "Skip install phase (no git archive)");
587   if ($have_slurm) {
588     Log(undef, "Warning: This probably means workers have no source tree!");
589   }
590 }
591 else {
592   Log(undef, "Run install script on all workers");
593
594   my @srunargs = ("srun",
595                   "--nodelist=$nodelist",
596                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
597   my @execargs = ("sh", "-c",
598                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
599
600   my $installpid = fork();
601   if ($installpid == 0)
602   {
603     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
604     exit (1);
605   }
606   while (1)
607   {
608     last if $installpid == waitpid (-1, WNOHANG);
609     freeze_if_want_freeze ($installpid);
610     select (undef, undef, undef, 0.1);
611   }
612   my $install_exited = $?;
613   Log (undef, "Install script exited ".exit_status_s($install_exited));
614   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
615     unlink($tar_filename);
616   }
617   exit (1) if $install_exited != 0;
618 }
619
620 foreach (qw (script script_version script_parameters runtime_constraints))
621 {
622   Log (undef,
623        "$_ " .
624        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
625 }
626 foreach (split (/\n/, $Job->{knobs}))
627 {
628   Log (undef, "knob " . $_);
629 }
630
631
632
633 $main::success = undef;
634
635
636
637 ONELEVEL:
638
639 my $thisround_succeeded = 0;
640 my $thisround_failed = 0;
641 my $thisround_failed_multiple = 0;
642
643 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
644                        or $a <=> $b } @jobstep_todo;
645 my $level = $jobstep[$jobstep_todo[0]]->{level};
646
647 my $initial_tasks_this_level = 0;
648 foreach my $id (@jobstep_todo) {
649   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
650 }
651
652 # If the number of tasks scheduled at this level #T is smaller than the number
653 # of slots available #S, only use the first #T slots, or the first slot on
654 # each node, whichever number is greater.
655 #
656 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
657 # based on these numbers.  Using fewer slots makes more resources available
658 # to each individual task, which should normally be a better strategy when
659 # there are fewer of them running with less parallelism.
660 #
661 # Note that this calculation is not redone if the initial tasks at
662 # this level queue more tasks at the same level.  This may harm
663 # overall task throughput for that level.
664 my @freeslot;
665 if ($initial_tasks_this_level < @node) {
666   @freeslot = (0..$#node);
667 } elsif ($initial_tasks_this_level < @slot) {
668   @freeslot = (0..$initial_tasks_this_level - 1);
669 } else {
670   @freeslot = (0..$#slot);
671 }
672 my $round_num_freeslots = scalar(@freeslot);
673
674 my %round_max_slots = ();
675 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
676   my $this_slot = $slot[$freeslot[$ii]];
677   my $node_name = $this_slot->{node}->{name};
678   $round_max_slots{$node_name} ||= $this_slot->{cpu};
679   last if (scalar(keys(%round_max_slots)) >= @node);
680 }
681
682 Log(undef, "start level $level with $round_num_freeslots slots");
683 my %proc;
684 my @holdslot;
685 my %reader;
686 my $progress_is_dirty = 1;
687 my $progress_stats_updated = 0;
688
689 update_progress_stats();
690
691
692 THISROUND:
693 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
694 {
695   my $id = $jobstep_todo[$todo_ptr];
696   my $Jobstep = $jobstep[$id];
697   if ($Jobstep->{level} != $level)
698   {
699     next;
700   }
701
702   pipe $reader{$id}, "writer" or croak ($!);
703   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
704   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
705
706   my $childslot = $freeslot[0];
707   my $childnode = $slot[$childslot]->{node};
708   my $childslotname = join (".",
709                             $slot[$childslot]->{node}->{name},
710                             $slot[$childslot]->{cpu});
711
712   my $childpid = fork();
713   if ($childpid == 0)
714   {
715     $SIG{'INT'} = 'DEFAULT';
716     $SIG{'QUIT'} = 'DEFAULT';
717     $SIG{'TERM'} = 'DEFAULT';
718
719     foreach (values (%reader))
720     {
721       close($_);
722     }
723     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
724     open(STDOUT,">&writer");
725     open(STDERR,">&writer");
726
727     undef $dbh;
728     undef $sth;
729
730     delete $ENV{"GNUPGHOME"};
731     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
732     $ENV{"TASK_QSEQUENCE"} = $id;
733     $ENV{"TASK_SEQUENCE"} = $level;
734     $ENV{"JOB_SCRIPT"} = $Job->{script};
735     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
736       $param =~ tr/a-z/A-Z/;
737       $ENV{"JOB_PARAMETER_$param"} = $value;
738     }
739     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
740     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
741     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
742     $ENV{"HOME"} = $ENV{"TASK_WORK"};
743     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
744     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
745     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
746     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
747
748     $ENV{"GZIP"} = "-n";
749
750     my @srunargs = (
751       "srun",
752       "--nodelist=".$childnode->{name},
753       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
754       "--job-name=$job_id.$id.$$",
755         );
756     my $command =
757         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
758         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
759         ."&& cd $ENV{CRUNCH_TMP} "
760         # These environment variables get used explicitly later in
761         # $command.  No tool is expected to read these values directly.
762         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
763         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
764         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
765         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
766     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
767     if ($docker_hash)
768     {
769       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
770       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
771       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
772       # We only set memory limits if Docker lets us limit both memory and swap.
773       # Memory limits alone have been supported longer, but subprocesses tend
774       # to get SIGKILL if they exceed that without any swap limit set.
775       # See #5642 for additional background.
776       if ($docker_limitmem) {
777         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
778       }
779
780       # Dynamically configure the container to use the host system as its
781       # DNS server.  Get the host's global addresses from the ip command,
782       # and turn them into docker --dns options using gawk.
783       $command .=
784           q{$(ip -o address show scope global |
785               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
786
787       # The source tree and $destdir directory (which we have
788       # installed on the worker host) are available in the container,
789       # under the same path.
790       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
791       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
792
793       # Currently, we make arv-mount's mount point appear at /keep
794       # inside the container (instead of using the same path as the
795       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
796       # crunch scripts and utilities must not rely on this. They must
797       # use $TASK_KEEPMOUNT.
798       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
799       $ENV{TASK_KEEPMOUNT} = "/keep";
800
801       # TASK_WORK is almost exactly like a docker data volume: it
802       # starts out empty, is writable, and persists until no
803       # containers use it any more. We don't use --volumes-from to
804       # share it with other containers: it is only accessible to this
805       # task, and it goes away when this task stops.
806       #
807       # However, a docker data volume is writable only by root unless
808       # the mount point already happens to exist in the container with
809       # different permissions. Therefore, we [1] assume /tmp already
810       # exists in the image and is writable by the crunch user; [2]
811       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
812       # writable if they are created by docker while setting up the
813       # other --volumes); and [3] create $TASK_WORK inside the
814       # container using $build_script.
815       $command .= "--volume=/tmp ";
816       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
817       $ENV{"HOME"} = $ENV{"TASK_WORK"};
818       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
819
820       # TODO: Share a single JOB_WORK volume across all task
821       # containers on a given worker node, and delete it when the job
822       # ends (and, in case that doesn't work, when the next job
823       # starts).
824       #
825       # For now, use the same approach as TASK_WORK above.
826       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
827
828       while (my ($env_key, $env_val) = each %ENV)
829       {
830         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
831           $command .= "--env=\Q$env_key=$env_val\E ";
832         }
833       }
834       $command .= "--env=\QHOME=$ENV{HOME}\E ";
835       $command .= "\Q$docker_hash\E ";
836       $command .= "stdbuf --output=0 --error=0 ";
837       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
838     } else {
839       # Non-docker run
840       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
841       $command .= "stdbuf --output=0 --error=0 ";
842       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
843     }
844
845     my @execargs = ('bash', '-c', $command);
846     srun (\@srunargs, \@execargs, undef, $build_script);
847     # exec() failed, we assume nothing happened.
848     die "srun() failed on build script\n";
849   }
850   close("writer");
851   if (!defined $childpid)
852   {
853     close $reader{$id};
854     delete $reader{$id};
855     next;
856   }
857   shift @freeslot;
858   $proc{$childpid} = { jobstep => $id,
859                        time => time,
860                        slot => $childslot,
861                        jobstepname => "$job_id.$id.$childpid",
862                      };
863   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
864   $slot[$childslot]->{pid} = $childpid;
865
866   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
867   Log ($id, "child $childpid started on $childslotname");
868   $Jobstep->{starttime} = time;
869   $Jobstep->{node} = $childnode->{name};
870   $Jobstep->{slotindex} = $childslot;
871   delete $Jobstep->{stderr};
872   delete $Jobstep->{finishtime};
873
874   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
875   $Jobstep->{'arvados_task'}->save;
876
877   splice @jobstep_todo, $todo_ptr, 1;
878   --$todo_ptr;
879
880   $progress_is_dirty = 1;
881
882   while (!@freeslot
883          ||
884          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
885   {
886     last THISROUND if $main::please_freeze || defined($main::success);
887     if ($main::please_info)
888     {
889       $main::please_info = 0;
890       freeze();
891       create_output_collection();
892       save_meta(1);
893       update_progress_stats();
894     }
895     my $gotsome
896         = readfrompipes ()
897         + reapchildren ();
898     if (!$gotsome)
899     {
900       check_refresh_wanted();
901       check_squeue();
902       update_progress_stats();
903       select (undef, undef, undef, 0.1);
904     }
905     elsif (time - $progress_stats_updated >= 30)
906     {
907       update_progress_stats();
908     }
909     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
910         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
911     {
912       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
913           .($thisround_failed+$thisround_succeeded)
914           .") -- giving up on this round";
915       Log (undef, $message);
916       last THISROUND;
917     }
918
919     # move slots from freeslot to holdslot (or back to freeslot) if necessary
920     for (my $i=$#freeslot; $i>=0; $i--) {
921       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
922         push @holdslot, (splice @freeslot, $i, 1);
923       }
924     }
925     for (my $i=$#holdslot; $i>=0; $i--) {
926       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
927         push @freeslot, (splice @holdslot, $i, 1);
928       }
929     }
930
931     # give up if no nodes are succeeding
932     if (!grep { $_->{node}->{losing_streak} == 0 &&
933                     $_->{node}->{hold_count} < 4 } @slot) {
934       my $message = "Every node has failed -- giving up on this round";
935       Log (undef, $message);
936       last THISROUND;
937     }
938   }
939 }
940
941
942 push @freeslot, splice @holdslot;
943 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
944
945
946 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
947 while (%proc)
948 {
949   if ($main::please_continue) {
950     $main::please_continue = 0;
951     goto THISROUND;
952   }
953   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
954   readfrompipes ();
955   if (!reapchildren())
956   {
957     check_refresh_wanted();
958     check_squeue();
959     update_progress_stats();
960     select (undef, undef, undef, 0.1);
961     killem (keys %proc) if $main::please_freeze;
962   }
963 }
964
965 update_progress_stats();
966 freeze_if_want_freeze();
967
968
969 if (!defined $main::success)
970 {
971   if (@jobstep_todo &&
972       $thisround_succeeded == 0 &&
973       ($thisround_failed == 0 || $thisround_failed > 4))
974   {
975     my $message = "stop because $thisround_failed tasks failed and none succeeded";
976     Log (undef, $message);
977     $main::success = 0;
978   }
979   if (!@jobstep_todo)
980   {
981     $main::success = 1;
982   }
983 }
984
985 goto ONELEVEL if !defined $main::success;
986
987
988 release_allocation();
989 freeze();
990 my $collated_output = &create_output_collection();
991
992 if (!$collated_output) {
993   Log (undef, "Failed to write output collection");
994 }
995 else {
996   Log(undef, "job output $collated_output");
997   $Job->update_attributes('output' => $collated_output);
998 }
999
1000 Log (undef, "finish");
1001
1002 save_meta();
1003
1004 my $final_state;
1005 if ($collated_output && $main::success) {
1006   $final_state = 'Complete';
1007 } else {
1008   $final_state = 'Failed';
1009 }
1010 $Job->update_attributes('state' => $final_state);
1011
1012 exit (($final_state eq 'Complete') ? 0 : 1);
1013
1014
1015
1016 sub update_progress_stats
1017 {
1018   $progress_stats_updated = time;
1019   return if !$progress_is_dirty;
1020   my ($todo, $done, $running) = (scalar @jobstep_todo,
1021                                  scalar @jobstep_done,
1022                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1023   $Job->{'tasks_summary'} ||= {};
1024   $Job->{'tasks_summary'}->{'todo'} = $todo;
1025   $Job->{'tasks_summary'}->{'done'} = $done;
1026   $Job->{'tasks_summary'}->{'running'} = $running;
1027   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1028   Log (undef, "status: $done done, $running running, $todo todo");
1029   $progress_is_dirty = 0;
1030 }
1031
1032
1033
1034 sub reapchildren
1035 {
1036   my $pid = waitpid (-1, WNOHANG);
1037   return 0 if $pid <= 0;
1038
1039   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1040                   . "."
1041                   . $slot[$proc{$pid}->{slot}]->{cpu});
1042   my $jobstepid = $proc{$pid}->{jobstep};
1043   my $elapsed = time - $proc{$pid}->{time};
1044   my $Jobstep = $jobstep[$jobstepid];
1045
1046   my $childstatus = $?;
1047   my $exitvalue = $childstatus >> 8;
1048   my $exitinfo = "exit ".exit_status_s($childstatus);
1049   $Jobstep->{'arvados_task'}->reload;
1050   my $task_success = $Jobstep->{'arvados_task'}->{success};
1051
1052   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1053
1054   if (!defined $task_success) {
1055     # task did not indicate one way or the other --> fail
1056     $Jobstep->{'arvados_task'}->{success} = 0;
1057     $Jobstep->{'arvados_task'}->save;
1058     $task_success = 0;
1059   }
1060
1061   if (!$task_success)
1062   {
1063     my $temporary_fail;
1064     $temporary_fail ||= $Jobstep->{node_fail};
1065     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1066
1067     ++$thisround_failed;
1068     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1069
1070     # Check for signs of a failed or misconfigured node
1071     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1072         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1073       # Don't count this against jobstep failure thresholds if this
1074       # node is already suspected faulty and srun exited quickly
1075       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1076           $elapsed < 5) {
1077         Log ($jobstepid, "blaming failure on suspect node " .
1078              $slot[$proc{$pid}->{slot}]->{node}->{name});
1079         $temporary_fail ||= 1;
1080       }
1081       ban_node_by_slot($proc{$pid}->{slot});
1082     }
1083
1084     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1085                              ++$Jobstep->{'failures'},
1086                              $temporary_fail ? 'temporary ' : 'permanent',
1087                              $elapsed));
1088
1089     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1090       # Give up on this task, and the whole job
1091       $main::success = 0;
1092     }
1093     # Put this task back on the todo queue
1094     push @jobstep_todo, $jobstepid;
1095     $Job->{'tasks_summary'}->{'failed'}++;
1096   }
1097   else
1098   {
1099     ++$thisround_succeeded;
1100     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1101     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1102     push @jobstep_done, $jobstepid;
1103     Log ($jobstepid, "success in $elapsed seconds");
1104   }
1105   $Jobstep->{exitcode} = $childstatus;
1106   $Jobstep->{finishtime} = time;
1107   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1108   $Jobstep->{'arvados_task'}->save;
1109   process_stderr ($jobstepid, $task_success);
1110   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1111                            length($Jobstep->{'arvados_task'}->{output}),
1112                            $Jobstep->{'arvados_task'}->{output}));
1113
1114   close $reader{$jobstepid};
1115   delete $reader{$jobstepid};
1116   delete $slot[$proc{$pid}->{slot}]->{pid};
1117   push @freeslot, $proc{$pid}->{slot};
1118   delete $proc{$pid};
1119
1120   if ($task_success) {
1121     # Load new tasks
1122     my $newtask_list = [];
1123     my $newtask_results;
1124     do {
1125       $newtask_results = api_call(
1126         "job_tasks/list",
1127         'where' => {
1128           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1129         },
1130         'order' => 'qsequence',
1131         'offset' => scalar(@$newtask_list),
1132       );
1133       push(@$newtask_list, @{$newtask_results->{items}});
1134     } while (@{$newtask_results->{items}});
1135     foreach my $arvados_task (@$newtask_list) {
1136       my $jobstep = {
1137         'level' => $arvados_task->{'sequence'},
1138         'failures' => 0,
1139         'arvados_task' => $arvados_task
1140       };
1141       push @jobstep, $jobstep;
1142       push @jobstep_todo, $#jobstep;
1143     }
1144   }
1145
1146   $progress_is_dirty = 1;
1147   1;
1148 }
1149
1150 sub check_refresh_wanted
1151 {
1152   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1153   if (@stat && $stat[9] > $latest_refresh) {
1154     $latest_refresh = scalar time;
1155     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1156     for my $attr ('cancelled_at',
1157                   'cancelled_by_user_uuid',
1158                   'cancelled_by_client_uuid',
1159                   'state') {
1160       $Job->{$attr} = $Job2->{$attr};
1161     }
1162     if ($Job->{'state'} ne "Running") {
1163       if ($Job->{'state'} eq "Cancelled") {
1164         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1165       } else {
1166         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1167       }
1168       $main::success = 0;
1169       $main::please_freeze = 1;
1170     }
1171   }
1172 }
1173
1174 sub check_squeue
1175 {
1176   # return if the kill list was checked <4 seconds ago
1177   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1178   {
1179     return;
1180   }
1181   $squeue_kill_checked = time;
1182
1183   # use killem() on procs whose killtime is reached
1184   for (keys %proc)
1185   {
1186     if (exists $proc{$_}->{killtime}
1187         && $proc{$_}->{killtime} <= time)
1188     {
1189       killem ($_);
1190     }
1191   }
1192
1193   # return if the squeue was checked <60 seconds ago
1194   if (defined $squeue_checked && $squeue_checked > time - 60)
1195   {
1196     return;
1197   }
1198   $squeue_checked = time;
1199
1200   if (!$have_slurm)
1201   {
1202     # here is an opportunity to check for mysterious problems with local procs
1203     return;
1204   }
1205
1206   # get a list of steps still running
1207   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1208   chop @squeue;
1209   if ($squeue[-1] ne "ok")
1210   {
1211     return;
1212   }
1213   pop @squeue;
1214
1215   # which of my jobsteps are running, according to squeue?
1216   my %ok;
1217   foreach (@squeue)
1218   {
1219     if (/^(\d+)\.(\d+) (\S+)/)
1220     {
1221       if ($1 eq $ENV{SLURM_JOBID})
1222       {
1223         $ok{$3} = 1;
1224       }
1225     }
1226   }
1227
1228   # which of my active child procs (>60s old) were not mentioned by squeue?
1229   foreach (keys %proc)
1230   {
1231     if ($proc{$_}->{time} < time - 60
1232         && !exists $ok{$proc{$_}->{jobstepname}}
1233         && !exists $proc{$_}->{killtime})
1234     {
1235       # kill this proc if it hasn't exited in 30 seconds
1236       $proc{$_}->{killtime} = time + 30;
1237     }
1238   }
1239 }
1240
1241
1242 sub release_allocation
1243 {
1244   if ($have_slurm)
1245   {
1246     Log (undef, "release job allocation");
1247     system "scancel $ENV{SLURM_JOBID}";
1248   }
1249 }
1250
1251
1252 sub readfrompipes
1253 {
1254   my $gotsome = 0;
1255   foreach my $job (keys %reader)
1256   {
1257     my $buf;
1258     while (0 < sysread ($reader{$job}, $buf, 8192))
1259     {
1260       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1261       $jobstep[$job]->{stderr} .= $buf;
1262       preprocess_stderr ($job);
1263       if (length ($jobstep[$job]->{stderr}) > 16384)
1264       {
1265         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1266       }
1267       $gotsome = 1;
1268     }
1269   }
1270   return $gotsome;
1271 }
1272
1273
1274 sub preprocess_stderr
1275 {
1276   my $job = shift;
1277
1278   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1279     my $line = $1;
1280     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1281     Log ($job, "stderr $line");
1282     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1283       # whoa.
1284       $main::please_freeze = 1;
1285     }
1286     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1287       $jobstep[$job]->{node_fail} = 1;
1288       ban_node_by_slot($jobstep[$job]->{slotindex});
1289     }
1290   }
1291 }
1292
1293
1294 sub process_stderr
1295 {
1296   my $job = shift;
1297   my $task_success = shift;
1298   preprocess_stderr ($job);
1299
1300   map {
1301     Log ($job, "stderr $_");
1302   } split ("\n", $jobstep[$job]->{stderr});
1303 }
1304
1305 sub fetch_block
1306 {
1307   my $hash = shift;
1308   my $keep;
1309   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1310     Log(undef, "fetch_block run error from arv-get $hash: $!");
1311     return undef;
1312   }
1313   my $output_block = "";
1314   while (1) {
1315     my $buf;
1316     my $bytes = sysread($keep, $buf, 1024 * 1024);
1317     if (!defined $bytes) {
1318       Log(undef, "fetch_block read error from arv-get: $!");
1319       $output_block = undef;
1320       last;
1321     } elsif ($bytes == 0) {
1322       # sysread returns 0 at the end of the pipe.
1323       last;
1324     } else {
1325       # some bytes were read into buf.
1326       $output_block .= $buf;
1327     }
1328   }
1329   close $keep;
1330   if ($?) {
1331     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1332     $output_block = undef;
1333   }
1334   return $output_block;
1335 }
1336
1337 # Create a collection by concatenating the output of all tasks (each
1338 # task's output is either a manifest fragment, a locator for a
1339 # manifest fragment stored in Keep, or nothing at all). Return the
1340 # portable_data_hash of the new collection.
1341 sub create_output_collection
1342 {
1343   Log (undef, "collate");
1344
1345   my ($child_out, $child_in);
1346   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1347 import arvados
1348 import sys
1349 print (arvados.api("v1").collections().
1350        create(body={"manifest_text": sys.stdin.read()}).
1351        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1352 }, retry_count());
1353
1354   my $task_idx = -1;
1355   my $manifest_size = 0;
1356   for (@jobstep)
1357   {
1358     ++$task_idx;
1359     my $output = $_->{'arvados_task'}->{output};
1360     next if (!defined($output));
1361     my $next_write;
1362     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1363       $next_write = fetch_block($output);
1364     } else {
1365       $next_write = $output;
1366     }
1367     if (defined($next_write)) {
1368       if (!defined(syswrite($child_in, $next_write))) {
1369         # There's been an error writing.  Stop the loop.
1370         # We'll log details about the exit code later.
1371         last;
1372       } else {
1373         $manifest_size += length($next_write);
1374       }
1375     } else {
1376       my $uuid = $_->{'arvados_task'}->{'uuid'};
1377       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1378       $main::success = 0;
1379     }
1380   }
1381   close($child_in);
1382   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1383
1384   my $joboutput;
1385   my $s = IO::Select->new($child_out);
1386   if ($s->can_read(120)) {
1387     sysread($child_out, $joboutput, 1024 * 1024);
1388     waitpid($pid, 0);
1389     if ($?) {
1390       Log(undef, "output collection creation exited " . exit_status_s($?));
1391       $joboutput = undef;
1392     } else {
1393       chomp($joboutput);
1394     }
1395   } else {
1396     Log (undef, "timed out while creating output collection");
1397     foreach my $signal (2, 2, 2, 15, 15, 9) {
1398       kill($signal, $pid);
1399       last if waitpid($pid, WNOHANG) == -1;
1400       sleep(1);
1401     }
1402   }
1403   close($child_out);
1404
1405   return $joboutput;
1406 }
1407
1408
1409 sub killem
1410 {
1411   foreach (@_)
1412   {
1413     my $sig = 2;                # SIGINT first
1414     if (exists $proc{$_}->{"sent_$sig"} &&
1415         time - $proc{$_}->{"sent_$sig"} > 4)
1416     {
1417       $sig = 15;                # SIGTERM if SIGINT doesn't work
1418     }
1419     if (exists $proc{$_}->{"sent_$sig"} &&
1420         time - $proc{$_}->{"sent_$sig"} > 4)
1421     {
1422       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1423     }
1424     if (!exists $proc{$_}->{"sent_$sig"})
1425     {
1426       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1427       kill $sig, $_;
1428       select (undef, undef, undef, 0.1);
1429       if ($sig == 2)
1430       {
1431         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1432       }
1433       $proc{$_}->{"sent_$sig"} = time;
1434       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1435     }
1436   }
1437 }
1438
1439
1440 sub fhbits
1441 {
1442   my($bits);
1443   for (@_) {
1444     vec($bits,fileno($_),1) = 1;
1445   }
1446   $bits;
1447 }
1448
1449
1450 # Send log output to Keep via arv-put.
1451 #
1452 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1453 # $log_pipe_pid is the pid of the arv-put subprocess.
1454 #
1455 # The only functions that should access these variables directly are:
1456 #
1457 # log_writer_start($logfilename)
1458 #     Starts an arv-put pipe, reading data on stdin and writing it to
1459 #     a $logfilename file in an output collection.
1460 #
1461 # log_writer_send($txt)
1462 #     Writes $txt to the output log collection.
1463 #
1464 # log_writer_finish()
1465 #     Closes the arv-put pipe and returns the output that it produces.
1466 #
1467 # log_writer_is_active()
1468 #     Returns a true value if there is currently a live arv-put
1469 #     process, false otherwise.
1470 #
1471 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1472
1473 sub log_writer_start($)
1474 {
1475   my $logfilename = shift;
1476   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1477                         'arv-put',
1478                         '--portable-data-hash',
1479                         '--project-uuid', $Job->{owner_uuid},
1480                         '--retries', '3',
1481                         '--name', $logfilename,
1482                         '--filename', $logfilename,
1483                         '-');
1484 }
1485
1486 sub log_writer_send($)
1487 {
1488   my $txt = shift;
1489   print $log_pipe_in $txt;
1490 }
1491
1492 sub log_writer_finish()
1493 {
1494   return unless $log_pipe_pid;
1495
1496   close($log_pipe_in);
1497   my $arv_put_output;
1498
1499   my $s = IO::Select->new($log_pipe_out);
1500   if ($s->can_read(120)) {
1501     sysread($log_pipe_out, $arv_put_output, 1024);
1502     chomp($arv_put_output);
1503   } else {
1504     Log (undef, "timed out reading from 'arv-put'");
1505   }
1506
1507   waitpid($log_pipe_pid, 0);
1508   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1509   if ($?) {
1510     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1511   }
1512
1513   return $arv_put_output;
1514 }
1515
1516 sub log_writer_is_active() {
1517   return $log_pipe_pid;
1518 }
1519
1520 sub Log                         # ($jobstep_id, $logmessage)
1521 {
1522   if ($_[1] =~ /\n/) {
1523     for my $line (split (/\n/, $_[1])) {
1524       Log ($_[0], $line);
1525     }
1526     return;
1527   }
1528   my $fh = select STDERR; $|=1; select $fh;
1529   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1530   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1531   $message .= "\n";
1532   my $datetime;
1533   if (log_writer_is_active() || -t STDERR) {
1534     my @gmtime = gmtime;
1535     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1536                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1537   }
1538   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1539
1540   if (log_writer_is_active()) {
1541     log_writer_send($datetime . " " . $message);
1542   }
1543 }
1544
1545
1546 sub croak
1547 {
1548   my ($package, $file, $line) = caller;
1549   my $message = "@_ at $file line $line\n";
1550   Log (undef, $message);
1551   freeze() if @jobstep_todo;
1552   create_output_collection() if @jobstep_todo;
1553   cleanup();
1554   save_meta();
1555   die;
1556 }
1557
1558
1559 sub cleanup
1560 {
1561   return unless $Job;
1562   if ($Job->{'state'} eq 'Cancelled') {
1563     $Job->update_attributes('finished_at' => scalar gmtime);
1564   } else {
1565     $Job->update_attributes('state' => 'Failed');
1566   }
1567 }
1568
1569
1570 sub save_meta
1571 {
1572   my $justcheckpoint = shift; # false if this will be the last meta saved
1573   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1574   return unless log_writer_is_active();
1575
1576   my $loglocator = log_writer_finish();
1577   Log (undef, "log manifest is $loglocator");
1578   $Job->{'log'} = $loglocator;
1579   $Job->update_attributes('log', $loglocator);
1580 }
1581
1582
1583 sub freeze_if_want_freeze
1584 {
1585   if ($main::please_freeze)
1586   {
1587     release_allocation();
1588     if (@_)
1589     {
1590       # kill some srun procs before freeze+stop
1591       map { $proc{$_} = {} } @_;
1592       while (%proc)
1593       {
1594         killem (keys %proc);
1595         select (undef, undef, undef, 0.1);
1596         my $died;
1597         while (($died = waitpid (-1, WNOHANG)) > 0)
1598         {
1599           delete $proc{$died};
1600         }
1601       }
1602     }
1603     freeze();
1604     create_output_collection();
1605     cleanup();
1606     save_meta();
1607     exit 1;
1608   }
1609 }
1610
1611
1612 sub freeze
1613 {
1614   Log (undef, "Freeze not implemented");
1615   return;
1616 }
1617
1618
1619 sub thaw
1620 {
1621   croak ("Thaw not implemented");
1622 }
1623
1624
1625 sub freezequote
1626 {
1627   my $s = shift;
1628   $s =~ s/\\/\\\\/g;
1629   $s =~ s/\n/\\n/g;
1630   return $s;
1631 }
1632
1633
1634 sub freezeunquote
1635 {
1636   my $s = shift;
1637   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1638   return $s;
1639 }
1640
1641
1642 sub srun
1643 {
1644   my $srunargs = shift;
1645   my $execargs = shift;
1646   my $opts = shift || {};
1647   my $stdin = shift;
1648   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1649
1650   $Data::Dumper::Terse = 1;
1651   $Data::Dumper::Indent = 0;
1652   my $show_cmd = Dumper($args);
1653   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1654   $show_cmd =~ s/\n/ /g;
1655   warn "starting: $show_cmd\n";
1656
1657   if (defined $stdin) {
1658     my $child = open STDIN, "-|";
1659     defined $child or die "no fork: $!";
1660     if ($child == 0) {
1661       print $stdin or die $!;
1662       close STDOUT or die $!;
1663       exit 0;
1664     }
1665   }
1666
1667   return system (@$args) if $opts->{fork};
1668
1669   exec @$args;
1670   warn "ENV size is ".length(join(" ",%ENV));
1671   die "exec failed: $!: @$args";
1672 }
1673
1674
1675 sub ban_node_by_slot {
1676   # Don't start any new jobsteps on this node for 60 seconds
1677   my $slotid = shift;
1678   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1679   $slot[$slotid]->{node}->{hold_count}++;
1680   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1681 }
1682
1683 sub must_lock_now
1684 {
1685   my ($lockfile, $error_message) = @_;
1686   open L, ">", $lockfile or croak("$lockfile: $!");
1687   if (!flock L, LOCK_EX|LOCK_NB) {
1688     croak("Can't lock $lockfile: $error_message\n");
1689   }
1690 }
1691
1692 sub find_docker_image {
1693   # Given a Keep locator, check to see if it contains a Docker image.
1694   # If so, return its stream name and Docker hash.
1695   # If not, return undef for both values.
1696   my $locator = shift;
1697   my ($streamname, $filename);
1698   my $image = api_call("collections/get", uuid => $locator);
1699   if ($image) {
1700     foreach my $line (split(/\n/, $image->{manifest_text})) {
1701       my @tokens = split(/\s+/, $line);
1702       next if (!@tokens);
1703       $streamname = shift(@tokens);
1704       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1705         if (defined($filename)) {
1706           return (undef, undef);  # More than one file in the Collection.
1707         } else {
1708           $filename = (split(/:/, $filedata, 3))[2];
1709         }
1710       }
1711     }
1712   }
1713   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1714     return ($streamname, $1);
1715   } else {
1716     return (undef, undef);
1717   }
1718 }
1719
1720 sub retry_count {
1721   # Calculate the number of times an operation should be retried,
1722   # assuming exponential backoff, and that we're willing to retry as
1723   # long as tasks have been running.  Enforce a minimum of 3 retries.
1724   my ($starttime, $endtime, $timediff, $retries);
1725   if (@jobstep) {
1726     $starttime = $jobstep[0]->{starttime};
1727     $endtime = $jobstep[-1]->{finishtime};
1728   }
1729   if (!defined($starttime)) {
1730     $timediff = 0;
1731   } elsif (!defined($endtime)) {
1732     $timediff = time - $starttime;
1733   } else {
1734     $timediff = ($endtime - $starttime) - (time - $endtime);
1735   }
1736   if ($timediff > 0) {
1737     $retries = int(log($timediff) / log(2));
1738   } else {
1739     $retries = 1;  # Use the minimum.
1740   }
1741   return ($retries > 3) ? $retries : 3;
1742 }
1743
1744 sub retry_op {
1745   # Pass in two function references.
1746   # This method will be called with the remaining arguments.
1747   # If it dies, retry it with exponential backoff until it succeeds,
1748   # or until the current retry_count is exhausted.  After each failure
1749   # that can be retried, the second function will be called with
1750   # the current try count (0-based), next try time, and error message.
1751   my $operation = shift;
1752   my $retry_callback = shift;
1753   my $retries = retry_count();
1754   foreach my $try_count (0..$retries) {
1755     my $next_try = time + (2 ** $try_count);
1756     my $result = eval { $operation->(@_); };
1757     if (!$@) {
1758       return $result;
1759     } elsif ($try_count < $retries) {
1760       $retry_callback->($try_count, $next_try, $@);
1761       my $sleep_time = $next_try - time;
1762       sleep($sleep_time) if ($sleep_time > 0);
1763     }
1764   }
1765   # Ensure the error message ends in a newline, so Perl doesn't add
1766   # retry_op's line number to it.
1767   chomp($@);
1768   die($@ . "\n");
1769 }
1770
1771 sub api_call {
1772   # Pass in a /-separated API method name, and arguments for it.
1773   # This function will call that method, retrying as needed until
1774   # the current retry_count is exhausted, with a log on the first failure.
1775   my $method_name = shift;
1776   my $log_api_retry = sub {
1777     my ($try_count, $next_try_at, $errmsg) = @_;
1778     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1779     $errmsg =~ s/\s/ /g;
1780     $errmsg =~ s/\s+$//;
1781     my $retry_msg;
1782     if ($next_try_at < time) {
1783       $retry_msg = "Retrying.";
1784     } else {
1785       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1786       $retry_msg = "Retrying at $next_try_fmt.";
1787     }
1788     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1789   };
1790   my $method = $arv;
1791   foreach my $key (split(/\//, $method_name)) {
1792     $method = $method->{$key};
1793   }
1794   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1795 }
1796
1797 sub exit_status_s {
1798   # Given a $?, return a human-readable exit code string like "0" or
1799   # "1" or "0 with signal 1" or "1 with signal 11".
1800   my $exitcode = shift;
1801   my $s = $exitcode >> 8;
1802   if ($exitcode & 0x7f) {
1803     $s .= " with signal " . ($exitcode & 0x7f);
1804   }
1805   if ($exitcode & 0x80) {
1806     $s .= " with core dump";
1807   }
1808   return $s;
1809 }
1810
1811 sub handle_readall {
1812   # Pass in a glob reference to a file handle.
1813   # Read all its contents and return them as a string.
1814   my $fh_glob_ref = shift;
1815   local $/ = undef;
1816   return <$fh_glob_ref>;
1817 }
1818
1819 sub tar_filename_n {
1820   my $n = shift;
1821   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1822 }
1823
1824 sub add_git_archive {
1825   # Pass in a git archive command as a string or list, a la system().
1826   # This method will save its output to be included in the archive sent to the
1827   # build script.
1828   my $git_input;
1829   $git_tar_count++;
1830   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1831     croak("Failed to save git archive: $!");
1832   }
1833   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1834   close($git_input);
1835   waitpid($git_pid, 0);
1836   close(GIT_ARCHIVE);
1837   if ($?) {
1838     croak("Failed to save git archive: git exited " . exit_status_s($?));
1839   }
1840 }
1841
1842 sub combined_git_archive {
1843   # Combine all saved tar archives into a single archive, then return its
1844   # contents in a string.  Return undef if no archives have been saved.
1845   if ($git_tar_count < 1) {
1846     return undef;
1847   }
1848   my $base_tar_name = tar_filename_n(1);
1849   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1850     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1851     if ($tar_exit != 0) {
1852       croak("Error preparing build archive: tar -A exited " .
1853             exit_status_s($tar_exit));
1854     }
1855   }
1856   if (!open(GIT_TAR, "<", $base_tar_name)) {
1857     croak("Could not open build archive: $!");
1858   }
1859   my $tar_contents = handle_readall(\*GIT_TAR);
1860   close(GIT_TAR);
1861   return $tar_contents;
1862 }
1863
1864 __DATA__
1865 #!/usr/bin/perl
1866 #
1867 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1868 # server invokes this script on individual compute nodes, or localhost if we're
1869 # running a job locally.  It gets called in two modes:
1870 #
1871 # * No arguments: Installation mode.  Read a tar archive from the DATA
1872 #   file handle; it includes the Crunch script's source code, and
1873 #   maybe SDKs as well.  Those should be installed in the proper
1874 #   locations.  This runs outside of any Docker container, so don't try to
1875 #   introspect Crunch's runtime environment.
1876 #
1877 # * With arguments: Crunch script run mode.  This script should set up the
1878 #   environment, then run the command specified in the arguments.  This runs
1879 #   inside any Docker container.
1880
1881 use Fcntl ':flock';
1882 use File::Path qw( make_path remove_tree );
1883 use POSIX qw(getcwd);
1884
1885 use constant TASK_TEMPFAIL => 111;
1886
1887 # Map SDK subdirectories to the path environments they belong to.
1888 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1889
1890 my $destdir = $ENV{"CRUNCH_SRC"};
1891 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1892 my $repo = $ENV{"CRUNCH_SRC_URL"};
1893 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1894 my $job_work = $ENV{"JOB_WORK"};
1895 my $task_work = $ENV{"TASK_WORK"};
1896
1897 for my $dir ($destdir, $job_work, $task_work) {
1898   if ($dir) {
1899     make_path $dir;
1900     -e $dir or die "Failed to create temporary directory ($dir): $!";
1901   }
1902 }
1903
1904 if ($task_work) {
1905   remove_tree($task_work, {keep_root => 1});
1906 }
1907
1908 open(STDOUT_ORIG, ">&", STDOUT);
1909 open(STDERR_ORIG, ">&", STDERR);
1910 open(STDOUT, ">>", "$destdir.log");
1911 open(STDERR, ">&", STDOUT);
1912
1913 ### Crunch script run mode
1914 if (@ARGV) {
1915   # We want to do routine logging during task 0 only.  This gives the user
1916   # the information they need, but avoids repeating the information for every
1917   # task.
1918   my $Log;
1919   if ($ENV{TASK_SEQUENCE} eq "0") {
1920     $Log = sub {
1921       my $msg = shift;
1922       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1923     };
1924   } else {
1925     $Log = sub { };
1926   }
1927
1928   my $python_src = "$install_dir/python";
1929   my $venv_dir = "$job_work/.arvados.venv";
1930   my $venv_built = -e "$venv_dir/bin/activate";
1931   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1932     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1933                  "--python=python2.7", $venv_dir);
1934     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1935     $venv_built = 1;
1936     $Log->("Built Python SDK virtualenv");
1937   }
1938
1939   my $pip_bin = "pip";
1940   if ($venv_built) {
1941     $Log->("Running in Python SDK virtualenv");
1942     $pip_bin = "$venv_dir/bin/pip";
1943     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1944     @ARGV = ("/bin/sh", "-ec",
1945              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
1946   } elsif (-d $python_src) {
1947     $Log->("Warning: virtualenv not found inside Docker container default " .
1948            "\$PATH. Can't install Python SDK.");
1949   }
1950
1951   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
1952   if ($pkgs) {
1953     $Log->("Using Arvados SDK:");
1954     foreach my $line (split /\n/, $pkgs) {
1955       $Log->($line);
1956     }
1957   } else {
1958     $Log->("Arvados SDK packages not found");
1959   }
1960
1961   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1962     my $sdk_path = "$install_dir/$sdk_dir";
1963     if (-d $sdk_path) {
1964       if ($ENV{$sdk_envkey}) {
1965         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1966       } else {
1967         $ENV{$sdk_envkey} = $sdk_path;
1968       }
1969       $Log->("Arvados SDK added to %s", $sdk_envkey);
1970     }
1971   }
1972
1973   close(STDOUT);
1974   close(STDERR);
1975   open(STDOUT, ">&", STDOUT_ORIG);
1976   open(STDERR, ">&", STDERR_ORIG);
1977   exec(@ARGV);
1978   die "Cannot exec `@ARGV`: $!";
1979 }
1980
1981 ### Installation mode
1982 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1983 flock L, LOCK_EX;
1984 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1985   # This version already installed -> nothing to do.
1986   exit(0);
1987 }
1988
1989 unlink "$destdir.commit";
1990 mkdir $destdir;
1991
1992 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
1993   die "Error launching 'tar -xC $destdir': $!";
1994 }
1995 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
1996 # get SIGPIPE.  We must feed it data incrementally.
1997 my $tar_input;
1998 while (read(DATA, $tar_input, 65536)) {
1999   print TARX $tar_input;
2000 }
2001 if(!close(TARX)) {
2002   die "'tar -xC $destdir' exited $?: $!";
2003 }
2004
2005 mkdir $install_dir;
2006
2007 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2008 if (-d $sdk_root) {
2009   foreach my $sdk_lang (("python",
2010                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2011     if (-d "$sdk_root/$sdk_lang") {
2012       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2013         die "Failed to install $sdk_lang SDK: $!";
2014       }
2015     }
2016   }
2017 }
2018
2019 my $python_dir = "$install_dir/python";
2020 if ((-d $python_dir) and can_run("python2.7") and
2021     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2022   # egg_info failed, probably when it asked git for a build tag.
2023   # Specify no build tag.
2024   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2025   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2026   close($pysdk_cfg);
2027 }
2028
2029 if (-e "$destdir/crunch_scripts/install") {
2030     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2031 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2032     # Old version
2033     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2034 } elsif (-e "./install.sh") {
2035     shell_or_die (undef, "./install.sh", $install_dir);
2036 }
2037
2038 if ($commit) {
2039     unlink "$destdir.commit.new";
2040     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
2041     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
2042 }
2043
2044 close L;
2045
2046 sub can_run {
2047   my $command_name = shift;
2048   open(my $which, "-|", "which", $command_name);
2049   while (<$which>) { }
2050   close($which);
2051   return ($? == 0);
2052 }
2053
2054 sub shell_or_die
2055 {
2056   my $exitcode = shift;
2057
2058   if ($ENV{"DEBUG"}) {
2059     print STDERR "@_\n";
2060   }
2061   if (system (@_) != 0) {
2062     my $err = $!;
2063     my $code = $?;
2064     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2065     open STDERR, ">&STDERR_ORIG";
2066     system ("cat $destdir.log >&2");
2067     warn "@_ failed ($err): $exitstatus";
2068     if (defined($exitcode)) {
2069       exit $exitcode;
2070     }
2071     else {
2072       exit (($code >> 8) || 1);
2073     }
2074   }
2075 }
2076
2077 __DATA__