Merge branch 'master' into 6087-collection-timing
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my %proc;
122 my $force_unlock;
123 my $git_dir;
124 my $jobspec;
125 my $job_api_token;
126 my $no_clear_tmp;
127 my $resume_stash;
128 GetOptions('force-unlock' => \$force_unlock,
129            'git-dir=s' => \$git_dir,
130            'job=s' => \$jobspec,
131            'job-api-token=s' => \$job_api_token,
132            'no-clear-tmp' => \$no_clear_tmp,
133            'resume-stash=s' => \$resume_stash,
134     );
135
136 if (defined $job_api_token) {
137   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
138 }
139
140 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
141 my $local_job = 0;
142
143
144 $SIG{'USR1'} = sub
145 {
146   $main::ENV{CRUNCH_DEBUG} = 1;
147 };
148 $SIG{'USR2'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 0;
151 };
152
153
154
155 my $arv = Arvados->new('apiVersion' => 'v1');
156
157 my $Job;
158 my $job_id;
159 my $dbh;
160 my $sth;
161 my @jobstep;
162
163 my $User = api_call("users/current");
164
165 if ($jobspec =~ /^[-a-z\d]+$/)
166 {
167   # $jobspec is an Arvados UUID, not a JSON job specification
168   $Job = api_call("jobs/get", uuid => $jobspec);
169   if (!$force_unlock) {
170     # Claim this job, and make sure nobody else does
171     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
172     if ($@) {
173       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
174       exit EX_TEMPFAIL;
175     };
176   }
177 }
178 else
179 {
180   $Job = JSON::decode_json($jobspec);
181
182   if (!$resume_stash)
183   {
184     map { croak ("No $_ specified") unless $Job->{$_} }
185     qw(script script_version script_parameters);
186   }
187
188   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
189   $Job->{'started_at'} = gmtime;
190   $Job->{'state'} = 'Running';
191
192   $Job = api_call("jobs/create", job => $Job);
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 log_writer_start($keep_logfile);
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
204 if ($? == 0) {
205   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
206   chomp($gem_versions);
207   chop($gem_versions);  # Closing parentheses
208 } else {
209   $gem_versions = "";
210 }
211 Log(undef,
212     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
213
214 Log (undef, "check slurm allocation");
215 my @slot;
216 my @node;
217 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
218 my @sinfo;
219 if (!$have_slurm)
220 {
221   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
222   push @sinfo, "$localcpus localhost";
223 }
224 if (exists $ENV{SLURM_NODELIST})
225 {
226   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
227 }
228 foreach (@sinfo)
229 {
230   my ($ncpus, $slurm_nodelist) = split;
231   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
232
233   my @nodelist;
234   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
235   {
236     my $nodelist = $1;
237     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
238     {
239       my $ranges = $1;
240       foreach (split (",", $ranges))
241       {
242         my ($a, $b);
243         if (/(\d+)-(\d+)/)
244         {
245           $a = $1;
246           $b = $2;
247         }
248         else
249         {
250           $a = $_;
251           $b = $_;
252         }
253         push @nodelist, map {
254           my $n = $nodelist;
255           $n =~ s/\[[-,\d]+\]/$_/;
256           $n;
257         } ($a..$b);
258       }
259     }
260     else
261     {
262       push @nodelist, $nodelist;
263     }
264   }
265   foreach my $nodename (@nodelist)
266   {
267     Log (undef, "node $nodename - $ncpus slots");
268     my $node = { name => $nodename,
269                  ncpus => $ncpus,
270                  losing_streak => 0,
271                  hold_until => 0 };
272     foreach my $cpu (1..$ncpus)
273     {
274       push @slot, { node => $node,
275                     cpu => $cpu };
276     }
277   }
278   push @node, @nodelist;
279 }
280
281
282
283 # Ensure that we get one jobstep running on each allocated node before
284 # we start overloading nodes with concurrent steps
285
286 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
287
288
289 $Job->update_attributes(
290   'tasks_summary' => { 'failed' => 0,
291                        'todo' => 1,
292                        'running' => 0,
293                        'done' => 0 });
294
295 Log (undef, "start");
296 $SIG{'INT'} = sub { $main::please_freeze = 1; };
297 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
298 $SIG{'TERM'} = \&croak;
299 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
300 $SIG{'ALRM'} = sub { $main::please_info = 1; };
301 $SIG{'CONT'} = sub { $main::please_continue = 1; };
302 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
303
304 $main::please_freeze = 0;
305 $main::please_info = 0;
306 $main::please_continue = 0;
307 $main::please_refresh = 0;
308 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
309
310 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
311 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
312 $ENV{"JOB_UUID"} = $job_id;
313
314
315 my @jobstep_todo = ();
316 my @jobstep_done = ();
317 my @jobstep_tomerge = ();
318 my $jobstep_tomerge_level = 0;
319 my $squeue_checked;
320 my $squeue_kill_checked;
321 my $latest_refresh = scalar time;
322
323
324
325 if (defined $Job->{thawedfromkey})
326 {
327   thaw ($Job->{thawedfromkey});
328 }
329 else
330 {
331   my $first_task = api_call("job_tasks/create", job_task => {
332     'job_uuid' => $Job->{'uuid'},
333     'sequence' => 0,
334     'qsequence' => 0,
335     'parameters' => {},
336   });
337   push @jobstep, { 'level' => 0,
338                    'failures' => 0,
339                    'arvados_task' => $first_task,
340                  };
341   push @jobstep_todo, 0;
342 }
343
344
345 if (!$have_slurm)
346 {
347   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
348 }
349
350 my $build_script = handle_readall(\*DATA);
351 my $nodelist = join(",", @node);
352 my $git_tar_count = 0;
353
354 if (!defined $no_clear_tmp) {
355   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
356   Log (undef, "Clean work dirs");
357
358   my $cleanpid = fork();
359   if ($cleanpid == 0)
360   {
361     # Find FUSE mounts that look like Keep mounts (the mount path has the
362     # word "keep") and unmount them.  Then clean up work directories.
363     # TODO: When #5036 is done and widely deployed, we can get rid of the
364     # regular expression and just unmount everything with type fuse.keep.
365     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
366           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
367     exit (1);
368   }
369   while (1)
370   {
371     last if $cleanpid == waitpid (-1, WNOHANG);
372     freeze_if_want_freeze ($cleanpid);
373     select (undef, undef, undef, 0.1);
374   }
375   Log (undef, "Cleanup command exited ".exit_status_s($?));
376 }
377
378 # If this job requires a Docker image, install that.
379 my $docker_bin = "/usr/bin/docker.io";
380 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
381 if ($docker_locator = $Job->{docker_image_locator}) {
382   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
383   if (!$docker_hash)
384   {
385     croak("No Docker image hash found from locator $docker_locator");
386   }
387   $docker_stream =~ s/^\.//;
388   my $docker_install_script = qq{
389 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
390     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
391 fi
392 };
393   my $docker_pid = fork();
394   if ($docker_pid == 0)
395   {
396     srun (["srun", "--nodelist=" . join(',', @node)],
397           ["/bin/sh", "-ec", $docker_install_script]);
398     exit ($?);
399   }
400   while (1)
401   {
402     last if $docker_pid == waitpid (-1, WNOHANG);
403     freeze_if_want_freeze ($docker_pid);
404     select (undef, undef, undef, 0.1);
405   }
406   if ($? != 0)
407   {
408     croak("Installing Docker image from $docker_locator exited "
409           .exit_status_s($?));
410   }
411
412   # Determine whether this version of Docker supports memory+swap limits.
413   srun(["srun", "--nodelist=" . $node[0]],
414        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
415       {fork => 1});
416   $docker_limitmem = ($? == 0);
417
418   if ($Job->{arvados_sdk_version}) {
419     # The job also specifies an Arvados SDK version.  Add the SDKs to the
420     # tar file for the build script to install.
421     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
422                        $Job->{arvados_sdk_version}));
423     add_git_archive("git", "--git-dir=$git_dir", "archive",
424                     "--prefix=.arvados.sdk/",
425                     $Job->{arvados_sdk_version}, "sdk");
426   }
427 }
428
429 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
430   # If script_version looks like an absolute path, *and* the --git-dir
431   # argument was not given -- which implies we were not invoked by
432   # crunch-dispatch -- we will use the given path as a working
433   # directory instead of resolving script_version to a git commit (or
434   # doing anything else with git).
435   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
436   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
437 }
438 else {
439   # Resolve the given script_version to a git commit sha1. Also, if
440   # the repository is remote, clone it into our local filesystem: this
441   # ensures "git archive" will work, and is necessary to reliably
442   # resolve a symbolic script_version like "master^".
443   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
444
445   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
446
447   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
448
449   # If we're running under crunch-dispatch, it will have already
450   # pulled the appropriate source tree into its own repository, and
451   # given us that repo's path as $git_dir.
452   #
453   # If we're running a "local" job, we might have to fetch content
454   # from a remote repository.
455   #
456   # (Currently crunch-dispatch gives a local path with --git-dir, but
457   # we might as well accept URLs there too in case it changes its
458   # mind.)
459   my $repo = $git_dir || $Job->{'repository'};
460
461   # Repository can be remote or local. If remote, we'll need to fetch it
462   # to a local dir before doing `git log` et al.
463   my $repo_location;
464
465   if ($repo =~ m{://|^[^/]*:}) {
466     # $repo is a git url we can clone, like git:// or https:// or
467     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
468     # not recognized here because distinguishing that from a local
469     # path is too fragile. If you really need something strange here,
470     # use the ssh:// form.
471     $repo_location = 'remote';
472   } elsif ($repo =~ m{^\.*/}) {
473     # $repo is a local path to a git index. We'll also resolve ../foo
474     # to ../foo/.git if the latter is a directory. To help
475     # disambiguate local paths from named hosted repositories, this
476     # form must be given as ./ or ../ if it's a relative path.
477     if (-d "$repo/.git") {
478       $repo = "$repo/.git";
479     }
480     $repo_location = 'local';
481   } else {
482     # $repo is none of the above. It must be the name of a hosted
483     # repository.
484     my $arv_repo_list = api_call("repositories/list",
485                                  'filters' => [['name','=',$repo]]);
486     my @repos_found = @{$arv_repo_list->{'items'}};
487     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
488     if ($n_found > 0) {
489       Log(undef, "Repository '$repo' -> "
490           . join(", ", map { $_->{'uuid'} } @repos_found));
491     }
492     if ($n_found != 1) {
493       croak("Error: Found $n_found repositories with name '$repo'.");
494     }
495     $repo = $repos_found[0]->{'fetch_url'};
496     $repo_location = 'remote';
497   }
498   Log(undef, "Using $repo_location repository '$repo'");
499   $ENV{"CRUNCH_SRC_URL"} = $repo;
500
501   # Resolve given script_version (we'll call that $treeish here) to a
502   # commit sha1 ($commit).
503   my $treeish = $Job->{'script_version'};
504   my $commit;
505   if ($repo_location eq 'remote') {
506     # We minimize excess object-fetching by re-using the same bare
507     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
508     # just keep adding remotes to it as needed.
509     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
510     my $gitcmd = "git --git-dir=\Q$local_repo\E";
511
512     # Set up our local repo for caching remote objects, making
513     # archives, etc.
514     if (!-d $local_repo) {
515       make_path($local_repo) or croak("Error: could not create $local_repo");
516     }
517     # This works (exits 0 and doesn't delete fetched objects) even
518     # if $local_repo is already initialized:
519     `$gitcmd init --bare`;
520     if ($?) {
521       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
522     }
523
524     # If $treeish looks like a hash (or abbrev hash) we look it up in
525     # our local cache first, since that's cheaper. (We don't want to
526     # do that with tags/branches though -- those change over time, so
527     # they should always be resolved by the remote repo.)
528     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
529       # Hide stderr because it's normal for this to fail:
530       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
531       if ($? == 0 &&
532           # Careful not to resolve a branch named abcdeff to commit 1234567:
533           $sha1 =~ /^$treeish/ &&
534           $sha1 =~ /^([0-9a-f]{40})$/s) {
535         $commit = $1;
536         Log(undef, "Commit $commit already present in $local_repo");
537       }
538     }
539
540     if (!defined $commit) {
541       # If $treeish isn't just a hash or abbrev hash, or isn't here
542       # yet, we need to fetch the remote to resolve it correctly.
543
544       # First, remove all local heads. This prevents a name that does
545       # not exist on the remote from resolving to (or colliding with)
546       # a previously fetched branch or tag (possibly from a different
547       # remote).
548       remove_tree("$local_repo/refs/heads", {keep_root => 1});
549
550       Log(undef, "Fetching objects from $repo to $local_repo");
551       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
552       if ($?) {
553         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
554       }
555     }
556
557     # Now that the data is all here, we will use our local repo for
558     # the rest of our git activities.
559     $repo = $local_repo;
560   }
561
562   my $gitcmd = "git --git-dir=\Q$repo\E";
563   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
564   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
565     croak("`$gitcmd rev-list` exited "
566           .exit_status_s($?)
567           .", '$treeish' not found. Giving up.");
568   }
569   $commit = $1;
570   Log(undef, "Version $treeish is commit $commit");
571
572   if ($commit ne $Job->{'script_version'}) {
573     # Record the real commit id in the database, frozentokey, logs,
574     # etc. -- instead of an abbreviation or a branch name which can
575     # become ambiguous or point to a different commit in the future.
576     if (!$Job->update_attributes('script_version' => $commit)) {
577       croak("Error: failed to update job's script_version attribute");
578     }
579   }
580
581   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
582   add_git_archive("$gitcmd archive ''\Q$commit\E");
583 }
584
585 my $git_archive = combined_git_archive();
586 if (!defined $git_archive) {
587   Log(undef, "Skip install phase (no git archive)");
588   if ($have_slurm) {
589     Log(undef, "Warning: This probably means workers have no source tree!");
590   }
591 }
592 else {
593   my $install_exited;
594   my $install_script_tries_left = 3;
595   for (my $attempts = 0; $attempts < 3; $attempts++) {
596     Log(undef, "Run install script on all workers");
597
598     my @srunargs = ("srun",
599                     "--nodelist=$nodelist",
600                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
601     my @execargs = ("sh", "-c",
602                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
603
604     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
605     my ($install_stderr_r, $install_stderr_w);
606     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
607     set_nonblocking($install_stderr_r);
608     my $installpid = fork();
609     if ($installpid == 0)
610     {
611       close($install_stderr_r);
612       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
613       open(STDOUT, ">&", $install_stderr_w);
614       open(STDERR, ">&", $install_stderr_w);
615       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
616       exit (1);
617     }
618     close($install_stderr_w);
619     # Tell freeze_if_want_freeze how to kill the child, otherwise the
620     # "waitpid(installpid)" loop won't get interrupted by a freeze:
621     $proc{$installpid} = {};
622     my $stderr_buf = '';
623     # Track whether anything appears on stderr other than slurm errors
624     # ("srun: ...") and the "starting: ..." message printed by the
625     # srun subroutine itself:
626     my $stderr_anything_from_script = 0;
627     my $match_our_own_errors = '^(srun: error: |starting: \[)';
628     while ($installpid != waitpid(-1, WNOHANG)) {
629       freeze_if_want_freeze ($installpid);
630       # Wait up to 0.1 seconds for something to appear on stderr, then
631       # do a non-blocking read.
632       my $bits = fhbits($install_stderr_r);
633       select ($bits, undef, $bits, 0.1);
634       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
635       {
636         while ($stderr_buf =~ /^(.*?)\n/) {
637           my $line = $1;
638           substr $stderr_buf, 0, 1+length($line), "";
639           Log(undef, "stderr $line");
640           if ($line !~ /$match_our_own_errors/) {
641             $stderr_anything_from_script = 1;
642           }
643         }
644       }
645     }
646     delete $proc{$installpid};
647     $install_exited = $?;
648     close($install_stderr_r);
649     if (length($stderr_buf) > 0) {
650       if ($stderr_buf !~ /$match_our_own_errors/) {
651         $stderr_anything_from_script = 1;
652       }
653       Log(undef, "stderr $stderr_buf")
654     }
655
656     Log (undef, "Install script exited ".exit_status_s($install_exited));
657     last if $install_exited == 0 || $main::please_freeze;
658     # If the install script fails but doesn't print an error message,
659     # the next thing anyone is likely to do is just run it again in
660     # case it was a transient problem like "slurm communication fails
661     # because the network isn't reliable enough". So we'll just do
662     # that ourselves (up to 3 attempts in total). OTOH, if there is an
663     # error message, the problem is more likely to have a real fix and
664     # we should fail the job so the fixing process can start, instead
665     # of doing 2 more attempts.
666     last if $stderr_anything_from_script;
667   }
668
669   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
670     unlink($tar_filename);
671   }
672
673   if ($install_exited != 0) {
674     croak("Giving up");
675   }
676 }
677
678 foreach (qw (script script_version script_parameters runtime_constraints))
679 {
680   Log (undef,
681        "$_ " .
682        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
683 }
684 foreach (split (/\n/, $Job->{knobs}))
685 {
686   Log (undef, "knob " . $_);
687 }
688
689
690
691 $main::success = undef;
692
693
694
695 ONELEVEL:
696
697 my $thisround_succeeded = 0;
698 my $thisround_failed = 0;
699 my $thisround_failed_multiple = 0;
700
701 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
702                        or $a <=> $b } @jobstep_todo;
703 my $level = $jobstep[$jobstep_todo[0]]->{level};
704
705 my $initial_tasks_this_level = 0;
706 foreach my $id (@jobstep_todo) {
707   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
708 }
709
710 # If the number of tasks scheduled at this level #T is smaller than the number
711 # of slots available #S, only use the first #T slots, or the first slot on
712 # each node, whichever number is greater.
713 #
714 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
715 # based on these numbers.  Using fewer slots makes more resources available
716 # to each individual task, which should normally be a better strategy when
717 # there are fewer of them running with less parallelism.
718 #
719 # Note that this calculation is not redone if the initial tasks at
720 # this level queue more tasks at the same level.  This may harm
721 # overall task throughput for that level.
722 my @freeslot;
723 if ($initial_tasks_this_level < @node) {
724   @freeslot = (0..$#node);
725 } elsif ($initial_tasks_this_level < @slot) {
726   @freeslot = (0..$initial_tasks_this_level - 1);
727 } else {
728   @freeslot = (0..$#slot);
729 }
730 my $round_num_freeslots = scalar(@freeslot);
731
732 my %round_max_slots = ();
733 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
734   my $this_slot = $slot[$freeslot[$ii]];
735   my $node_name = $this_slot->{node}->{name};
736   $round_max_slots{$node_name} ||= $this_slot->{cpu};
737   last if (scalar(keys(%round_max_slots)) >= @node);
738 }
739
740 Log(undef, "start level $level with $round_num_freeslots slots");
741 my @holdslot;
742 my %reader;
743 my $progress_is_dirty = 1;
744 my $progress_stats_updated = 0;
745
746 update_progress_stats();
747
748
749 THISROUND:
750 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
751 {
752   my $id = $jobstep_todo[$todo_ptr];
753   my $Jobstep = $jobstep[$id];
754   if ($Jobstep->{level} != $level)
755   {
756     next;
757   }
758
759   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
760   set_nonblocking($reader{$id});
761
762   my $childslot = $freeslot[0];
763   my $childnode = $slot[$childslot]->{node};
764   my $childslotname = join (".",
765                             $slot[$childslot]->{node}->{name},
766                             $slot[$childslot]->{cpu});
767
768   my $childpid = fork();
769   if ($childpid == 0)
770   {
771     $SIG{'INT'} = 'DEFAULT';
772     $SIG{'QUIT'} = 'DEFAULT';
773     $SIG{'TERM'} = 'DEFAULT';
774
775     foreach (values (%reader))
776     {
777       close($_);
778     }
779     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
780     open(STDOUT,">&writer");
781     open(STDERR,">&writer");
782
783     undef $dbh;
784     undef $sth;
785
786     delete $ENV{"GNUPGHOME"};
787     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
788     $ENV{"TASK_QSEQUENCE"} = $id;
789     $ENV{"TASK_SEQUENCE"} = $level;
790     $ENV{"JOB_SCRIPT"} = $Job->{script};
791     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
792       $param =~ tr/a-z/A-Z/;
793       $ENV{"JOB_PARAMETER_$param"} = $value;
794     }
795     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
796     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
797     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
798     $ENV{"HOME"} = $ENV{"TASK_WORK"};
799     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
800     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
801     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
802     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
803
804     $ENV{"GZIP"} = "-n";
805
806     my @srunargs = (
807       "srun",
808       "--nodelist=".$childnode->{name},
809       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
810       "--job-name=$job_id.$id.$$",
811         );
812     my $command =
813         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
814         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
815         ."&& cd $ENV{CRUNCH_TMP} "
816         # These environment variables get used explicitly later in
817         # $command.  No tool is expected to read these values directly.
818         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
819         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
820         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
821         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
822     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
823     if ($docker_hash)
824     {
825       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
826       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
827       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
828       # We only set memory limits if Docker lets us limit both memory and swap.
829       # Memory limits alone have been supported longer, but subprocesses tend
830       # to get SIGKILL if they exceed that without any swap limit set.
831       # See #5642 for additional background.
832       if ($docker_limitmem) {
833         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
834       }
835
836       # Dynamically configure the container to use the host system as its
837       # DNS server.  Get the host's global addresses from the ip command,
838       # and turn them into docker --dns options using gawk.
839       $command .=
840           q{$(ip -o address show scope global |
841               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
842
843       # The source tree and $destdir directory (which we have
844       # installed on the worker host) are available in the container,
845       # under the same path.
846       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
847       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
848
849       # Currently, we make arv-mount's mount point appear at /keep
850       # inside the container (instead of using the same path as the
851       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
852       # crunch scripts and utilities must not rely on this. They must
853       # use $TASK_KEEPMOUNT.
854       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
855       $ENV{TASK_KEEPMOUNT} = "/keep";
856
857       # TASK_WORK is almost exactly like a docker data volume: it
858       # starts out empty, is writable, and persists until no
859       # containers use it any more. We don't use --volumes-from to
860       # share it with other containers: it is only accessible to this
861       # task, and it goes away when this task stops.
862       #
863       # However, a docker data volume is writable only by root unless
864       # the mount point already happens to exist in the container with
865       # different permissions. Therefore, we [1] assume /tmp already
866       # exists in the image and is writable by the crunch user; [2]
867       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
868       # writable if they are created by docker while setting up the
869       # other --volumes); and [3] create $TASK_WORK inside the
870       # container using $build_script.
871       $command .= "--volume=/tmp ";
872       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
873       $ENV{"HOME"} = $ENV{"TASK_WORK"};
874       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
875
876       # TODO: Share a single JOB_WORK volume across all task
877       # containers on a given worker node, and delete it when the job
878       # ends (and, in case that doesn't work, when the next job
879       # starts).
880       #
881       # For now, use the same approach as TASK_WORK above.
882       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
883
884       while (my ($env_key, $env_val) = each %ENV)
885       {
886         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
887           $command .= "--env=\Q$env_key=$env_val\E ";
888         }
889       }
890       $command .= "--env=\QHOME=$ENV{HOME}\E ";
891       $command .= "\Q$docker_hash\E ";
892       $command .= "stdbuf --output=0 --error=0 ";
893       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
894     } else {
895       # Non-docker run
896       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
897       $command .= "stdbuf --output=0 --error=0 ";
898       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
899     }
900
901     my @execargs = ('bash', '-c', $command);
902     srun (\@srunargs, \@execargs, undef, $build_script);
903     # exec() failed, we assume nothing happened.
904     die "srun() failed on build script\n";
905   }
906   close("writer");
907   if (!defined $childpid)
908   {
909     close $reader{$id};
910     delete $reader{$id};
911     next;
912   }
913   shift @freeslot;
914   $proc{$childpid} = { jobstep => $id,
915                        time => time,
916                        slot => $childslot,
917                        jobstepname => "$job_id.$id.$childpid",
918                      };
919   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
920   $slot[$childslot]->{pid} = $childpid;
921
922   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
923   Log ($id, "child $childpid started on $childslotname");
924   $Jobstep->{starttime} = time;
925   $Jobstep->{node} = $childnode->{name};
926   $Jobstep->{slotindex} = $childslot;
927   delete $Jobstep->{stderr};
928   delete $Jobstep->{finishtime};
929
930   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
931   $Jobstep->{'arvados_task'}->save;
932
933   splice @jobstep_todo, $todo_ptr, 1;
934   --$todo_ptr;
935
936   $progress_is_dirty = 1;
937
938   while (!@freeslot
939          ||
940          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
941   {
942     last THISROUND if $main::please_freeze || defined($main::success);
943     if ($main::please_info)
944     {
945       $main::please_info = 0;
946       freeze();
947       create_output_collection();
948       save_meta(1);
949       update_progress_stats();
950     }
951     my $gotsome
952         = readfrompipes ()
953         + reapchildren ();
954     if (!$gotsome)
955     {
956       check_refresh_wanted();
957       check_squeue();
958       update_progress_stats();
959       select (undef, undef, undef, 0.1);
960     }
961     elsif (time - $progress_stats_updated >= 30)
962     {
963       update_progress_stats();
964     }
965     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
966         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
967     {
968       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
969           .($thisround_failed+$thisround_succeeded)
970           .") -- giving up on this round";
971       Log (undef, $message);
972       last THISROUND;
973     }
974
975     # move slots from freeslot to holdslot (or back to freeslot) if necessary
976     for (my $i=$#freeslot; $i>=0; $i--) {
977       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
978         push @holdslot, (splice @freeslot, $i, 1);
979       }
980     }
981     for (my $i=$#holdslot; $i>=0; $i--) {
982       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
983         push @freeslot, (splice @holdslot, $i, 1);
984       }
985     }
986
987     # give up if no nodes are succeeding
988     if (!grep { $_->{node}->{losing_streak} == 0 &&
989                     $_->{node}->{hold_count} < 4 } @slot) {
990       my $message = "Every node has failed -- giving up on this round";
991       Log (undef, $message);
992       last THISROUND;
993     }
994   }
995 }
996
997
998 push @freeslot, splice @holdslot;
999 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1000
1001
1002 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1003 while (%proc)
1004 {
1005   if ($main::please_continue) {
1006     $main::please_continue = 0;
1007     goto THISROUND;
1008   }
1009   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1010   readfrompipes ();
1011   if (!reapchildren())
1012   {
1013     check_refresh_wanted();
1014     check_squeue();
1015     update_progress_stats();
1016     select (undef, undef, undef, 0.1);
1017     killem (keys %proc) if $main::please_freeze;
1018   }
1019 }
1020
1021 update_progress_stats();
1022 freeze_if_want_freeze();
1023
1024
1025 if (!defined $main::success)
1026 {
1027   if (@jobstep_todo &&
1028       $thisround_succeeded == 0 &&
1029       ($thisround_failed == 0 || $thisround_failed > 4))
1030   {
1031     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1032     Log (undef, $message);
1033     $main::success = 0;
1034   }
1035   if (!@jobstep_todo)
1036   {
1037     $main::success = 1;
1038   }
1039 }
1040
1041 goto ONELEVEL if !defined $main::success;
1042
1043
1044 release_allocation();
1045 freeze();
1046 my $collated_output = &create_output_collection();
1047
1048 if (!$collated_output) {
1049   Log (undef, "Failed to write output collection");
1050 }
1051 else {
1052   Log(undef, "job output $collated_output");
1053   $Job->update_attributes('output' => $collated_output);
1054 }
1055
1056 Log (undef, "finish");
1057
1058 save_meta();
1059
1060 my $final_state;
1061 if ($collated_output && $main::success) {
1062   $final_state = 'Complete';
1063 } else {
1064   $final_state = 'Failed';
1065 }
1066 $Job->update_attributes('state' => $final_state);
1067
1068 exit (($final_state eq 'Complete') ? 0 : 1);
1069
1070
1071
1072 sub update_progress_stats
1073 {
1074   $progress_stats_updated = time;
1075   return if !$progress_is_dirty;
1076   my ($todo, $done, $running) = (scalar @jobstep_todo,
1077                                  scalar @jobstep_done,
1078                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1079   $Job->{'tasks_summary'} ||= {};
1080   $Job->{'tasks_summary'}->{'todo'} = $todo;
1081   $Job->{'tasks_summary'}->{'done'} = $done;
1082   $Job->{'tasks_summary'}->{'running'} = $running;
1083   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1084   Log (undef, "status: $done done, $running running, $todo todo");
1085   $progress_is_dirty = 0;
1086 }
1087
1088
1089
1090 sub reapchildren
1091 {
1092   my $pid = waitpid (-1, WNOHANG);
1093   return 0 if $pid <= 0;
1094
1095   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1096                   . "."
1097                   . $slot[$proc{$pid}->{slot}]->{cpu});
1098   my $jobstepid = $proc{$pid}->{jobstep};
1099   my $elapsed = time - $proc{$pid}->{time};
1100   my $Jobstep = $jobstep[$jobstepid];
1101
1102   my $childstatus = $?;
1103   my $exitvalue = $childstatus >> 8;
1104   my $exitinfo = "exit ".exit_status_s($childstatus);
1105   $Jobstep->{'arvados_task'}->reload;
1106   my $task_success = $Jobstep->{'arvados_task'}->{success};
1107
1108   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1109
1110   if (!defined $task_success) {
1111     # task did not indicate one way or the other --> fail
1112     $Jobstep->{'arvados_task'}->{success} = 0;
1113     $Jobstep->{'arvados_task'}->save;
1114     $task_success = 0;
1115   }
1116
1117   if (!$task_success)
1118   {
1119     my $temporary_fail;
1120     $temporary_fail ||= $Jobstep->{node_fail};
1121     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1122
1123     ++$thisround_failed;
1124     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1125
1126     # Check for signs of a failed or misconfigured node
1127     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1128         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1129       # Don't count this against jobstep failure thresholds if this
1130       # node is already suspected faulty and srun exited quickly
1131       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1132           $elapsed < 5) {
1133         Log ($jobstepid, "blaming failure on suspect node " .
1134              $slot[$proc{$pid}->{slot}]->{node}->{name});
1135         $temporary_fail ||= 1;
1136       }
1137       ban_node_by_slot($proc{$pid}->{slot});
1138     }
1139
1140     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1141                              ++$Jobstep->{'failures'},
1142                              $temporary_fail ? 'temporary ' : 'permanent',
1143                              $elapsed));
1144
1145     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1146       # Give up on this task, and the whole job
1147       $main::success = 0;
1148     }
1149     # Put this task back on the todo queue
1150     push @jobstep_todo, $jobstepid;
1151     $Job->{'tasks_summary'}->{'failed'}++;
1152   }
1153   else
1154   {
1155     ++$thisround_succeeded;
1156     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1157     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1158     push @jobstep_done, $jobstepid;
1159     Log ($jobstepid, "success in $elapsed seconds");
1160   }
1161   $Jobstep->{exitcode} = $childstatus;
1162   $Jobstep->{finishtime} = time;
1163   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1164   $Jobstep->{'arvados_task'}->save;
1165   process_stderr ($jobstepid, $task_success);
1166   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1167                            length($Jobstep->{'arvados_task'}->{output}),
1168                            $Jobstep->{'arvados_task'}->{output}));
1169
1170   close $reader{$jobstepid};
1171   delete $reader{$jobstepid};
1172   delete $slot[$proc{$pid}->{slot}]->{pid};
1173   push @freeslot, $proc{$pid}->{slot};
1174   delete $proc{$pid};
1175
1176   if ($task_success) {
1177     # Load new tasks
1178     my $newtask_list = [];
1179     my $newtask_results;
1180     do {
1181       $newtask_results = api_call(
1182         "job_tasks/list",
1183         'where' => {
1184           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1185         },
1186         'order' => 'qsequence',
1187         'offset' => scalar(@$newtask_list),
1188       );
1189       push(@$newtask_list, @{$newtask_results->{items}});
1190     } while (@{$newtask_results->{items}});
1191     foreach my $arvados_task (@$newtask_list) {
1192       my $jobstep = {
1193         'level' => $arvados_task->{'sequence'},
1194         'failures' => 0,
1195         'arvados_task' => $arvados_task
1196       };
1197       push @jobstep, $jobstep;
1198       push @jobstep_todo, $#jobstep;
1199     }
1200   }
1201
1202   $progress_is_dirty = 1;
1203   1;
1204 }
1205
1206 sub check_refresh_wanted
1207 {
1208   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1209   if (@stat && $stat[9] > $latest_refresh) {
1210     $latest_refresh = scalar time;
1211     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1212     for my $attr ('cancelled_at',
1213                   'cancelled_by_user_uuid',
1214                   'cancelled_by_client_uuid',
1215                   'state') {
1216       $Job->{$attr} = $Job2->{$attr};
1217     }
1218     if ($Job->{'state'} ne "Running") {
1219       if ($Job->{'state'} eq "Cancelled") {
1220         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1221       } else {
1222         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1223       }
1224       $main::success = 0;
1225       $main::please_freeze = 1;
1226     }
1227   }
1228 }
1229
1230 sub check_squeue
1231 {
1232   # return if the kill list was checked <4 seconds ago
1233   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1234   {
1235     return;
1236   }
1237   $squeue_kill_checked = time;
1238
1239   # use killem() on procs whose killtime is reached
1240   for (keys %proc)
1241   {
1242     if (exists $proc{$_}->{killtime}
1243         && $proc{$_}->{killtime} <= time)
1244     {
1245       killem ($_);
1246     }
1247   }
1248
1249   # return if the squeue was checked <60 seconds ago
1250   if (defined $squeue_checked && $squeue_checked > time - 60)
1251   {
1252     return;
1253   }
1254   $squeue_checked = time;
1255
1256   if (!$have_slurm)
1257   {
1258     # here is an opportunity to check for mysterious problems with local procs
1259     return;
1260   }
1261
1262   # get a list of steps still running
1263   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1264   chop @squeue;
1265   if ($squeue[-1] ne "ok")
1266   {
1267     return;
1268   }
1269   pop @squeue;
1270
1271   # which of my jobsteps are running, according to squeue?
1272   my %ok;
1273   foreach (@squeue)
1274   {
1275     if (/^(\d+)\.(\d+) (\S+)/)
1276     {
1277       if ($1 eq $ENV{SLURM_JOBID})
1278       {
1279         $ok{$3} = 1;
1280       }
1281     }
1282   }
1283
1284   # which of my active child procs (>60s old) were not mentioned by squeue?
1285   foreach (keys %proc)
1286   {
1287     if ($proc{$_}->{time} < time - 60
1288         && !exists $ok{$proc{$_}->{jobstepname}}
1289         && !exists $proc{$_}->{killtime})
1290     {
1291       # kill this proc if it hasn't exited in 30 seconds
1292       $proc{$_}->{killtime} = time + 30;
1293     }
1294   }
1295 }
1296
1297
1298 sub release_allocation
1299 {
1300   if ($have_slurm)
1301   {
1302     Log (undef, "release job allocation");
1303     system "scancel $ENV{SLURM_JOBID}";
1304   }
1305 }
1306
1307
1308 sub readfrompipes
1309 {
1310   my $gotsome = 0;
1311   foreach my $job (keys %reader)
1312   {
1313     my $buf;
1314     while (0 < sysread ($reader{$job}, $buf, 8192))
1315     {
1316       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1317       $jobstep[$job]->{stderr} .= $buf;
1318       preprocess_stderr ($job);
1319       if (length ($jobstep[$job]->{stderr}) > 16384)
1320       {
1321         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1322       }
1323       $gotsome = 1;
1324     }
1325   }
1326   return $gotsome;
1327 }
1328
1329
1330 sub preprocess_stderr
1331 {
1332   my $job = shift;
1333
1334   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1335     my $line = $1;
1336     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1337     Log ($job, "stderr $line");
1338     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1339       # whoa.
1340       $main::please_freeze = 1;
1341     }
1342     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1343       $jobstep[$job]->{node_fail} = 1;
1344       ban_node_by_slot($jobstep[$job]->{slotindex});
1345     }
1346   }
1347 }
1348
1349
1350 sub process_stderr
1351 {
1352   my $job = shift;
1353   my $task_success = shift;
1354   preprocess_stderr ($job);
1355
1356   map {
1357     Log ($job, "stderr $_");
1358   } split ("\n", $jobstep[$job]->{stderr});
1359 }
1360
1361 sub fetch_block
1362 {
1363   my $hash = shift;
1364   my $keep;
1365   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1366     Log(undef, "fetch_block run error from arv-get $hash: $!");
1367     return undef;
1368   }
1369   my $output_block = "";
1370   while (1) {
1371     my $buf;
1372     my $bytes = sysread($keep, $buf, 1024 * 1024);
1373     if (!defined $bytes) {
1374       Log(undef, "fetch_block read error from arv-get: $!");
1375       $output_block = undef;
1376       last;
1377     } elsif ($bytes == 0) {
1378       # sysread returns 0 at the end of the pipe.
1379       last;
1380     } else {
1381       # some bytes were read into buf.
1382       $output_block .= $buf;
1383     }
1384   }
1385   close $keep;
1386   if ($?) {
1387     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1388     $output_block = undef;
1389   }
1390   return $output_block;
1391 }
1392
1393 # Create a collection by concatenating the output of all tasks (each
1394 # task's output is either a manifest fragment, a locator for a
1395 # manifest fragment stored in Keep, or nothing at all). Return the
1396 # portable_data_hash of the new collection.
1397 sub create_output_collection
1398 {
1399   Log (undef, "collate");
1400
1401   my ($child_out, $child_in);
1402   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1403 import arvados
1404 import sys
1405 print (arvados.api("v1").collections().
1406        create(body={"manifest_text": sys.stdin.read()}).
1407        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1408 }, retry_count());
1409
1410   my $task_idx = -1;
1411   my $manifest_size = 0;
1412   for (@jobstep)
1413   {
1414     ++$task_idx;
1415     my $output = $_->{'arvados_task'}->{output};
1416     next if (!defined($output));
1417     my $next_write;
1418     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1419       $next_write = fetch_block($output);
1420     } else {
1421       $next_write = $output;
1422     }
1423     if (defined($next_write)) {
1424       if (!defined(syswrite($child_in, $next_write))) {
1425         # There's been an error writing.  Stop the loop.
1426         # We'll log details about the exit code later.
1427         last;
1428       } else {
1429         $manifest_size += length($next_write);
1430       }
1431     } else {
1432       my $uuid = $_->{'arvados_task'}->{'uuid'};
1433       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1434       $main::success = 0;
1435     }
1436   }
1437   close($child_in);
1438   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1439
1440   my $joboutput;
1441   my $s = IO::Select->new($child_out);
1442   if ($s->can_read(120)) {
1443     sysread($child_out, $joboutput, 1024 * 1024);
1444     waitpid($pid, 0);
1445     if ($?) {
1446       Log(undef, "output collection creation exited " . exit_status_s($?));
1447       $joboutput = undef;
1448     } else {
1449       chomp($joboutput);
1450     }
1451   } else {
1452     Log (undef, "timed out while creating output collection");
1453     foreach my $signal (2, 2, 2, 15, 15, 9) {
1454       kill($signal, $pid);
1455       last if waitpid($pid, WNOHANG) == -1;
1456       sleep(1);
1457     }
1458   }
1459   close($child_out);
1460
1461   return $joboutput;
1462 }
1463
1464
1465 sub killem
1466 {
1467   foreach (@_)
1468   {
1469     my $sig = 2;                # SIGINT first
1470     if (exists $proc{$_}->{"sent_$sig"} &&
1471         time - $proc{$_}->{"sent_$sig"} > 4)
1472     {
1473       $sig = 15;                # SIGTERM if SIGINT doesn't work
1474     }
1475     if (exists $proc{$_}->{"sent_$sig"} &&
1476         time - $proc{$_}->{"sent_$sig"} > 4)
1477     {
1478       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1479     }
1480     if (!exists $proc{$_}->{"sent_$sig"})
1481     {
1482       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1483       kill $sig, $_;
1484       select (undef, undef, undef, 0.1);
1485       if ($sig == 2)
1486       {
1487         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1488       }
1489       $proc{$_}->{"sent_$sig"} = time;
1490       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1491     }
1492   }
1493 }
1494
1495
1496 sub fhbits
1497 {
1498   my($bits);
1499   for (@_) {
1500     vec($bits,fileno($_),1) = 1;
1501   }
1502   $bits;
1503 }
1504
1505
1506 # Send log output to Keep via arv-put.
1507 #
1508 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1509 # $log_pipe_pid is the pid of the arv-put subprocess.
1510 #
1511 # The only functions that should access these variables directly are:
1512 #
1513 # log_writer_start($logfilename)
1514 #     Starts an arv-put pipe, reading data on stdin and writing it to
1515 #     a $logfilename file in an output collection.
1516 #
1517 # log_writer_send($txt)
1518 #     Writes $txt to the output log collection.
1519 #
1520 # log_writer_finish()
1521 #     Closes the arv-put pipe and returns the output that it produces.
1522 #
1523 # log_writer_is_active()
1524 #     Returns a true value if there is currently a live arv-put
1525 #     process, false otherwise.
1526 #
1527 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1528
1529 sub log_writer_start($)
1530 {
1531   my $logfilename = shift;
1532   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1533                         'arv-put',
1534                         '--portable-data-hash',
1535                         '--project-uuid', $Job->{owner_uuid},
1536                         '--retries', '3',
1537                         '--name', $logfilename,
1538                         '--filename', $logfilename,
1539                         '-');
1540 }
1541
1542 sub log_writer_send($)
1543 {
1544   my $txt = shift;
1545   print $log_pipe_in $txt;
1546 }
1547
1548 sub log_writer_finish()
1549 {
1550   return unless $log_pipe_pid;
1551
1552   close($log_pipe_in);
1553   my $arv_put_output;
1554
1555   my $s = IO::Select->new($log_pipe_out);
1556   if ($s->can_read(120)) {
1557     sysread($log_pipe_out, $arv_put_output, 1024);
1558     chomp($arv_put_output);
1559   } else {
1560     Log (undef, "timed out reading from 'arv-put'");
1561   }
1562
1563   waitpid($log_pipe_pid, 0);
1564   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1565   if ($?) {
1566     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1567   }
1568
1569   return $arv_put_output;
1570 }
1571
1572 sub log_writer_is_active() {
1573   return $log_pipe_pid;
1574 }
1575
1576 sub Log                         # ($jobstep_id, $logmessage)
1577 {
1578   if ($_[1] =~ /\n/) {
1579     for my $line (split (/\n/, $_[1])) {
1580       Log ($_[0], $line);
1581     }
1582     return;
1583   }
1584   my $fh = select STDERR; $|=1; select $fh;
1585   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1586   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1587   $message .= "\n";
1588   my $datetime;
1589   if (log_writer_is_active() || -t STDERR) {
1590     my @gmtime = gmtime;
1591     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1592                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1593   }
1594   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1595
1596   if (log_writer_is_active()) {
1597     log_writer_send($datetime . " " . $message);
1598   }
1599 }
1600
1601
1602 sub croak
1603 {
1604   my ($package, $file, $line) = caller;
1605   my $message = "@_ at $file line $line\n";
1606   Log (undef, $message);
1607   freeze() if @jobstep_todo;
1608   create_output_collection() if @jobstep_todo;
1609   cleanup();
1610   save_meta();
1611   die;
1612 }
1613
1614
1615 sub cleanup
1616 {
1617   return unless $Job;
1618   if ($Job->{'state'} eq 'Cancelled') {
1619     $Job->update_attributes('finished_at' => scalar gmtime);
1620   } else {
1621     $Job->update_attributes('state' => 'Failed');
1622   }
1623 }
1624
1625
1626 sub save_meta
1627 {
1628   my $justcheckpoint = shift; # false if this will be the last meta saved
1629   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1630   return unless log_writer_is_active();
1631
1632   my $loglocator = log_writer_finish();
1633   Log (undef, "log manifest is $loglocator");
1634   $Job->{'log'} = $loglocator;
1635   $Job->update_attributes('log', $loglocator);
1636 }
1637
1638
1639 sub freeze_if_want_freeze
1640 {
1641   if ($main::please_freeze)
1642   {
1643     release_allocation();
1644     if (@_)
1645     {
1646       # kill some srun procs before freeze+stop
1647       map { $proc{$_} = {} } @_;
1648       while (%proc)
1649       {
1650         killem (keys %proc);
1651         select (undef, undef, undef, 0.1);
1652         my $died;
1653         while (($died = waitpid (-1, WNOHANG)) > 0)
1654         {
1655           delete $proc{$died};
1656         }
1657       }
1658     }
1659     freeze();
1660     create_output_collection();
1661     cleanup();
1662     save_meta();
1663     exit 1;
1664   }
1665 }
1666
1667
1668 sub freeze
1669 {
1670   Log (undef, "Freeze not implemented");
1671   return;
1672 }
1673
1674
1675 sub thaw
1676 {
1677   croak ("Thaw not implemented");
1678 }
1679
1680
1681 sub freezequote
1682 {
1683   my $s = shift;
1684   $s =~ s/\\/\\\\/g;
1685   $s =~ s/\n/\\n/g;
1686   return $s;
1687 }
1688
1689
1690 sub freezeunquote
1691 {
1692   my $s = shift;
1693   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1694   return $s;
1695 }
1696
1697
1698 sub srun
1699 {
1700   my $srunargs = shift;
1701   my $execargs = shift;
1702   my $opts = shift || {};
1703   my $stdin = shift;
1704   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1705
1706   $Data::Dumper::Terse = 1;
1707   $Data::Dumper::Indent = 0;
1708   my $show_cmd = Dumper($args);
1709   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1710   $show_cmd =~ s/\n/ /g;
1711   warn "starting: $show_cmd\n";
1712
1713   if (defined $stdin) {
1714     my $child = open STDIN, "-|";
1715     defined $child or die "no fork: $!";
1716     if ($child == 0) {
1717       print $stdin or die $!;
1718       close STDOUT or die $!;
1719       exit 0;
1720     }
1721   }
1722
1723   return system (@$args) if $opts->{fork};
1724
1725   exec @$args;
1726   warn "ENV size is ".length(join(" ",%ENV));
1727   die "exec failed: $!: @$args";
1728 }
1729
1730
1731 sub ban_node_by_slot {
1732   # Don't start any new jobsteps on this node for 60 seconds
1733   my $slotid = shift;
1734   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1735   $slot[$slotid]->{node}->{hold_count}++;
1736   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1737 }
1738
1739 sub must_lock_now
1740 {
1741   my ($lockfile, $error_message) = @_;
1742   open L, ">", $lockfile or croak("$lockfile: $!");
1743   if (!flock L, LOCK_EX|LOCK_NB) {
1744     croak("Can't lock $lockfile: $error_message\n");
1745   }
1746 }
1747
1748 sub find_docker_image {
1749   # Given a Keep locator, check to see if it contains a Docker image.
1750   # If so, return its stream name and Docker hash.
1751   # If not, return undef for both values.
1752   my $locator = shift;
1753   my ($streamname, $filename);
1754   my $image = api_call("collections/get", uuid => $locator);
1755   if ($image) {
1756     foreach my $line (split(/\n/, $image->{manifest_text})) {
1757       my @tokens = split(/\s+/, $line);
1758       next if (!@tokens);
1759       $streamname = shift(@tokens);
1760       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1761         if (defined($filename)) {
1762           return (undef, undef);  # More than one file in the Collection.
1763         } else {
1764           $filename = (split(/:/, $filedata, 3))[2];
1765         }
1766       }
1767     }
1768   }
1769   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1770     return ($streamname, $1);
1771   } else {
1772     return (undef, undef);
1773   }
1774 }
1775
1776 sub retry_count {
1777   # Calculate the number of times an operation should be retried,
1778   # assuming exponential backoff, and that we're willing to retry as
1779   # long as tasks have been running.  Enforce a minimum of 3 retries.
1780   my ($starttime, $endtime, $timediff, $retries);
1781   if (@jobstep) {
1782     $starttime = $jobstep[0]->{starttime};
1783     $endtime = $jobstep[-1]->{finishtime};
1784   }
1785   if (!defined($starttime)) {
1786     $timediff = 0;
1787   } elsif (!defined($endtime)) {
1788     $timediff = time - $starttime;
1789   } else {
1790     $timediff = ($endtime - $starttime) - (time - $endtime);
1791   }
1792   if ($timediff > 0) {
1793     $retries = int(log($timediff) / log(2));
1794   } else {
1795     $retries = 1;  # Use the minimum.
1796   }
1797   return ($retries > 3) ? $retries : 3;
1798 }
1799
1800 sub retry_op {
1801   # Pass in two function references.
1802   # This method will be called with the remaining arguments.
1803   # If it dies, retry it with exponential backoff until it succeeds,
1804   # or until the current retry_count is exhausted.  After each failure
1805   # that can be retried, the second function will be called with
1806   # the current try count (0-based), next try time, and error message.
1807   my $operation = shift;
1808   my $retry_callback = shift;
1809   my $retries = retry_count();
1810   foreach my $try_count (0..$retries) {
1811     my $next_try = time + (2 ** $try_count);
1812     my $result = eval { $operation->(@_); };
1813     if (!$@) {
1814       return $result;
1815     } elsif ($try_count < $retries) {
1816       $retry_callback->($try_count, $next_try, $@);
1817       my $sleep_time = $next_try - time;
1818       sleep($sleep_time) if ($sleep_time > 0);
1819     }
1820   }
1821   # Ensure the error message ends in a newline, so Perl doesn't add
1822   # retry_op's line number to it.
1823   chomp($@);
1824   die($@ . "\n");
1825 }
1826
1827 sub api_call {
1828   # Pass in a /-separated API method name, and arguments for it.
1829   # This function will call that method, retrying as needed until
1830   # the current retry_count is exhausted, with a log on the first failure.
1831   my $method_name = shift;
1832   my $log_api_retry = sub {
1833     my ($try_count, $next_try_at, $errmsg) = @_;
1834     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1835     $errmsg =~ s/\s/ /g;
1836     $errmsg =~ s/\s+$//;
1837     my $retry_msg;
1838     if ($next_try_at < time) {
1839       $retry_msg = "Retrying.";
1840     } else {
1841       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1842       $retry_msg = "Retrying at $next_try_fmt.";
1843     }
1844     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1845   };
1846   my $method = $arv;
1847   foreach my $key (split(/\//, $method_name)) {
1848     $method = $method->{$key};
1849   }
1850   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1851 }
1852
1853 sub exit_status_s {
1854   # Given a $?, return a human-readable exit code string like "0" or
1855   # "1" or "0 with signal 1" or "1 with signal 11".
1856   my $exitcode = shift;
1857   my $s = $exitcode >> 8;
1858   if ($exitcode & 0x7f) {
1859     $s .= " with signal " . ($exitcode & 0x7f);
1860   }
1861   if ($exitcode & 0x80) {
1862     $s .= " with core dump";
1863   }
1864   return $s;
1865 }
1866
1867 sub handle_readall {
1868   # Pass in a glob reference to a file handle.
1869   # Read all its contents and return them as a string.
1870   my $fh_glob_ref = shift;
1871   local $/ = undef;
1872   return <$fh_glob_ref>;
1873 }
1874
1875 sub tar_filename_n {
1876   my $n = shift;
1877   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1878 }
1879
1880 sub add_git_archive {
1881   # Pass in a git archive command as a string or list, a la system().
1882   # This method will save its output to be included in the archive sent to the
1883   # build script.
1884   my $git_input;
1885   $git_tar_count++;
1886   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1887     croak("Failed to save git archive: $!");
1888   }
1889   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1890   close($git_input);
1891   waitpid($git_pid, 0);
1892   close(GIT_ARCHIVE);
1893   if ($?) {
1894     croak("Failed to save git archive: git exited " . exit_status_s($?));
1895   }
1896 }
1897
1898 sub combined_git_archive {
1899   # Combine all saved tar archives into a single archive, then return its
1900   # contents in a string.  Return undef if no archives have been saved.
1901   if ($git_tar_count < 1) {
1902     return undef;
1903   }
1904   my $base_tar_name = tar_filename_n(1);
1905   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1906     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1907     if ($tar_exit != 0) {
1908       croak("Error preparing build archive: tar -A exited " .
1909             exit_status_s($tar_exit));
1910     }
1911   }
1912   if (!open(GIT_TAR, "<", $base_tar_name)) {
1913     croak("Could not open build archive: $!");
1914   }
1915   my $tar_contents = handle_readall(\*GIT_TAR);
1916   close(GIT_TAR);
1917   return $tar_contents;
1918 }
1919
1920 sub set_nonblocking {
1921   my $fh = shift;
1922   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1923   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1924 }
1925
1926 __DATA__
1927 #!/usr/bin/perl
1928 #
1929 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1930 # server invokes this script on individual compute nodes, or localhost if we're
1931 # running a job locally.  It gets called in two modes:
1932 #
1933 # * No arguments: Installation mode.  Read a tar archive from the DATA
1934 #   file handle; it includes the Crunch script's source code, and
1935 #   maybe SDKs as well.  Those should be installed in the proper
1936 #   locations.  This runs outside of any Docker container, so don't try to
1937 #   introspect Crunch's runtime environment.
1938 #
1939 # * With arguments: Crunch script run mode.  This script should set up the
1940 #   environment, then run the command specified in the arguments.  This runs
1941 #   inside any Docker container.
1942
1943 use Fcntl ':flock';
1944 use File::Path qw( make_path remove_tree );
1945 use POSIX qw(getcwd);
1946
1947 use constant TASK_TEMPFAIL => 111;
1948
1949 # Map SDK subdirectories to the path environments they belong to.
1950 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1951
1952 my $destdir = $ENV{"CRUNCH_SRC"};
1953 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
1954 my $repo = $ENV{"CRUNCH_SRC_URL"};
1955 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1956 my $job_work = $ENV{"JOB_WORK"};
1957 my $task_work = $ENV{"TASK_WORK"};
1958
1959 open(STDOUT_ORIG, ">&", STDOUT);
1960 open(STDERR_ORIG, ">&", STDERR);
1961
1962 for my $dir ($destdir, $job_work, $task_work) {
1963   if ($dir) {
1964     make_path $dir;
1965     -e $dir or die "Failed to create temporary directory ($dir): $!";
1966   }
1967 }
1968
1969 if ($task_work) {
1970   remove_tree($task_work, {keep_root => 1});
1971 }
1972
1973 ### Crunch script run mode
1974 if (@ARGV) {
1975   # We want to do routine logging during task 0 only.  This gives the user
1976   # the information they need, but avoids repeating the information for every
1977   # task.
1978   my $Log;
1979   if ($ENV{TASK_SEQUENCE} eq "0") {
1980     $Log = sub {
1981       my $msg = shift;
1982       printf STDERR_ORIG "[Crunch] $msg\n", @_;
1983     };
1984   } else {
1985     $Log = sub { };
1986   }
1987
1988   my $python_src = "$install_dir/python";
1989   my $venv_dir = "$job_work/.arvados.venv";
1990   my $venv_built = -e "$venv_dir/bin/activate";
1991   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
1992     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
1993                  "--python=python2.7", $venv_dir);
1994     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
1995     $venv_built = 1;
1996     $Log->("Built Python SDK virtualenv");
1997   }
1998
1999   my $pip_bin = "pip";
2000   if ($venv_built) {
2001     $Log->("Running in Python SDK virtualenv");
2002     $pip_bin = "$venv_dir/bin/pip";
2003     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2004     @ARGV = ("/bin/sh", "-ec",
2005              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2006   } elsif (-d $python_src) {
2007     $Log->("Warning: virtualenv not found inside Docker container default " .
2008            "\$PATH. Can't install Python SDK.");
2009   }
2010
2011   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2012   if ($pkgs) {
2013     $Log->("Using Arvados SDK:");
2014     foreach my $line (split /\n/, $pkgs) {
2015       $Log->($line);
2016     }
2017   } else {
2018     $Log->("Arvados SDK packages not found");
2019   }
2020
2021   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2022     my $sdk_path = "$install_dir/$sdk_dir";
2023     if (-d $sdk_path) {
2024       if ($ENV{$sdk_envkey}) {
2025         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2026       } else {
2027         $ENV{$sdk_envkey} = $sdk_path;
2028       }
2029       $Log->("Arvados SDK added to %s", $sdk_envkey);
2030     }
2031   }
2032
2033   exec(@ARGV);
2034   die "Cannot exec `@ARGV`: $!";
2035 }
2036
2037 ### Installation mode
2038 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2039 flock L, LOCK_EX;
2040 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2041   # This exact git archive (source + arvados sdk) is already installed
2042   # here, so there's no need to reinstall it.
2043
2044   # We must consume our DATA section, though: otherwise the process
2045   # feeding it to us will get SIGPIPE.
2046   my $buf;
2047   while (read(DATA, $buf, 65536)) { }
2048
2049   exit(0);
2050 }
2051
2052 unlink "$destdir.archive_hash";
2053 mkdir $destdir;
2054
2055 if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
2056   die "Error launching 'tar -xC $destdir': $!";
2057 }
2058 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2059 # get SIGPIPE.  We must feed it data incrementally.
2060 my $tar_input;
2061 while (read(DATA, $tar_input, 65536)) {
2062   print TARX $tar_input;
2063 }
2064 if(!close(TARX)) {
2065   die "'tar -xC $destdir' exited $?: $!";
2066 }
2067
2068 mkdir $install_dir;
2069
2070 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2071 if (-d $sdk_root) {
2072   foreach my $sdk_lang (("python",
2073                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2074     if (-d "$sdk_root/$sdk_lang") {
2075       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2076         die "Failed to install $sdk_lang SDK: $!";
2077       }
2078     }
2079   }
2080 }
2081
2082 my $python_dir = "$install_dir/python";
2083 if ((-d $python_dir) and can_run("python2.7") and
2084     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2085   # egg_info failed, probably when it asked git for a build tag.
2086   # Specify no build tag.
2087   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2088   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2089   close($pysdk_cfg);
2090 }
2091
2092 # Hide messages from the install script (unless it fails: shell_or_die
2093 # will show $destdir.log in that case).
2094 open(STDOUT, ">>", "$destdir.log");
2095 open(STDERR, ">&", STDOUT);
2096
2097 if (-e "$destdir/crunch_scripts/install") {
2098     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2099 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2100     # Old version
2101     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2102 } elsif (-e "./install.sh") {
2103     shell_or_die (undef, "./install.sh", $install_dir);
2104 }
2105
2106 if ($archive_hash) {
2107     unlink "$destdir.archive_hash.new";
2108     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2109     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2110 }
2111
2112 close L;
2113
2114 sub can_run {
2115   my $command_name = shift;
2116   open(my $which, "-|", "which", $command_name);
2117   while (<$which>) { }
2118   close($which);
2119   return ($? == 0);
2120 }
2121
2122 sub shell_or_die
2123 {
2124   my $exitcode = shift;
2125
2126   if ($ENV{"DEBUG"}) {
2127     print STDERR "@_\n";
2128   }
2129   if (system (@_) != 0) {
2130     my $err = $!;
2131     my $code = $?;
2132     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2133     open STDERR, ">&STDERR_ORIG";
2134     system ("cat $destdir.log >&2");
2135     warn "@_ failed ($err): $exitstatus";
2136     if (defined($exitcode)) {
2137       exit $exitcode;
2138     }
2139     else {
2140       exit (($code >> 8) || 1);
2141     }
2142   }
2143 }
2144
2145 __DATA__