Merge branch 'master' into 6194-python-arvfile-large-write
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101
102 $ENV{"TMPDIR"} ||= "/tmp";
103 unless (defined $ENV{"CRUNCH_TMP"}) {
104   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
105   if ($ENV{"USER"} ne "crunch" && $< != 0) {
106     # use a tmp dir unique for my uid
107     $ENV{"CRUNCH_TMP"} .= "-$<";
108   }
109 }
110
111 # Create the tmp directory if it does not exist
112 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
113   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
114 }
115
116 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
117 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
118 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
119 mkdir ($ENV{"JOB_WORK"});
120
121 my %proc;
122 my $force_unlock;
123 my $git_dir;
124 my $jobspec;
125 my $job_api_token;
126 my $no_clear_tmp;
127 my $resume_stash;
128 my $docker_bin = "/usr/bin/docker.io";
129 GetOptions('force-unlock' => \$force_unlock,
130            'git-dir=s' => \$git_dir,
131            'job=s' => \$jobspec,
132            'job-api-token=s' => \$job_api_token,
133            'no-clear-tmp' => \$no_clear_tmp,
134            'resume-stash=s' => \$resume_stash,
135            'docker-bin=s' => \$docker_bin,
136     );
137
138 if (defined $job_api_token) {
139   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
140 }
141
142 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
143
144
145 $SIG{'USR1'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 1;
148 };
149 $SIG{'USR2'} = sub
150 {
151   $main::ENV{CRUNCH_DEBUG} = 0;
152 };
153
154 my $arv = Arvados->new('apiVersion' => 'v1');
155
156 my $Job;
157 my $job_id;
158 my $dbh;
159 my $sth;
160 my @jobstep;
161
162 my $local_job;
163 if ($jobspec =~ /^[-a-z\d]+$/)
164 {
165   # $jobspec is an Arvados UUID, not a JSON job specification
166   $Job = api_call("jobs/get", uuid => $jobspec);
167   $local_job = 0;
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172   $local_job = 1;
173 }
174
175
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
178 # misconfigured.
179 my $cmd = ['true'];
180 if ($Job->{docker_image_locator}) {
181   $cmd = [$docker_bin, 'ps', '-q'];
182 }
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
185      $cmd,
186      {fork => 1});
187 if ($? != 0) {
188   Log(undef, "Sanity check failed: ".exit_status_s($?));
189   exit EX_TEMPFAIL;
190 }
191 Log(undef, "Sanity check OK");
192
193
194 my $User = api_call("users/current");
195
196 if (!$local_job) {
197   if (!$force_unlock) {
198     # Claim this job, and make sure nobody else does
199     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
200     if ($@) {
201       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
202       exit EX_TEMPFAIL;
203     };
204   }
205 }
206 else
207 {
208   if (!$resume_stash)
209   {
210     map { croak ("No $_ specified") unless $Job->{$_} }
211     qw(script script_version script_parameters);
212   }
213
214   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
215   $Job->{'started_at'} = gmtime;
216   $Job->{'state'} = 'Running';
217
218   $Job = api_call("jobs/create", job => $Job);
219 }
220 $job_id = $Job->{'uuid'};
221
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
224
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
228
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
230 if ($? == 0) {
231   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232   chomp($gem_versions);
233   chop($gem_versions);  # Closing parentheses
234 } else {
235   $gem_versions = "";
236 }
237 Log(undef,
238     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
239
240 Log (undef, "check slurm allocation");
241 my @slot;
242 my @node;
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
244 my @sinfo;
245 if (!$have_slurm)
246 {
247   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248   push @sinfo, "$localcpus localhost";
249 }
250 if (exists $ENV{SLURM_NODELIST})
251 {
252   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
253 }
254 foreach (@sinfo)
255 {
256   my ($ncpus, $slurm_nodelist) = split;
257   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
258
259   my @nodelist;
260   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
261   {
262     my $nodelist = $1;
263     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
264     {
265       my $ranges = $1;
266       foreach (split (",", $ranges))
267       {
268         my ($a, $b);
269         if (/(\d+)-(\d+)/)
270         {
271           $a = $1;
272           $b = $2;
273         }
274         else
275         {
276           $a = $_;
277           $b = $_;
278         }
279         push @nodelist, map {
280           my $n = $nodelist;
281           $n =~ s/\[[-,\d]+\]/$_/;
282           $n;
283         } ($a..$b);
284       }
285     }
286     else
287     {
288       push @nodelist, $nodelist;
289     }
290   }
291   foreach my $nodename (@nodelist)
292   {
293     Log (undef, "node $nodename - $ncpus slots");
294     my $node = { name => $nodename,
295                  ncpus => $ncpus,
296                  losing_streak => 0,
297                  hold_until => 0 };
298     foreach my $cpu (1..$ncpus)
299     {
300       push @slot, { node => $node,
301                     cpu => $cpu };
302     }
303   }
304   push @node, @nodelist;
305 }
306
307
308
309 # Ensure that we get one jobstep running on each allocated node before
310 # we start overloading nodes with concurrent steps
311
312 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
313
314
315 $Job->update_attributes(
316   'tasks_summary' => { 'failed' => 0,
317                        'todo' => 1,
318                        'running' => 0,
319                        'done' => 0 });
320
321 Log (undef, "start");
322 $SIG{'INT'} = sub { $main::please_freeze = 1; };
323 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
324 $SIG{'TERM'} = \&croak;
325 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
326 $SIG{'ALRM'} = sub { $main::please_info = 1; };
327 $SIG{'CONT'} = sub { $main::please_continue = 1; };
328 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
329
330 $main::please_freeze = 0;
331 $main::please_info = 0;
332 $main::please_continue = 0;
333 $main::please_refresh = 0;
334 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
335
336 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
337 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
338 $ENV{"JOB_UUID"} = $job_id;
339
340
341 my @jobstep_todo = ();
342 my @jobstep_done = ();
343 my @jobstep_tomerge = ();
344 my $jobstep_tomerge_level = 0;
345 my $squeue_checked;
346 my $squeue_kill_checked;
347 my $latest_refresh = scalar time;
348
349
350
351 if (defined $Job->{thawedfromkey})
352 {
353   thaw ($Job->{thawedfromkey});
354 }
355 else
356 {
357   my $first_task = api_call("job_tasks/create", job_task => {
358     'job_uuid' => $Job->{'uuid'},
359     'sequence' => 0,
360     'qsequence' => 0,
361     'parameters' => {},
362   });
363   push @jobstep, { 'level' => 0,
364                    'failures' => 0,
365                    'arvados_task' => $first_task,
366                  };
367   push @jobstep_todo, 0;
368 }
369
370
371 if (!$have_slurm)
372 {
373   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
374 }
375
376 my $build_script = handle_readall(\*DATA);
377 my $nodelist = join(",", @node);
378 my $git_tar_count = 0;
379
380 if (!defined $no_clear_tmp) {
381   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
382   Log (undef, "Clean work dirs");
383
384   my $cleanpid = fork();
385   if ($cleanpid == 0)
386   {
387     # Find FUSE mounts that look like Keep mounts (the mount path has the
388     # word "keep") and unmount them.  Then clean up work directories.
389     # TODO: When #5036 is done and widely deployed, we can get rid of the
390     # regular expression and just unmount everything with type fuse.keep.
391     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
392           ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
393     exit (1);
394   }
395   while (1)
396   {
397     last if $cleanpid == waitpid (-1, WNOHANG);
398     freeze_if_want_freeze ($cleanpid);
399     select (undef, undef, undef, 0.1);
400   }
401   Log (undef, "Cleanup command exited ".exit_status_s($?));
402 }
403
404 # If this job requires a Docker image, install that.
405 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
406 if ($docker_locator = $Job->{docker_image_locator}) {
407   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
408   if (!$docker_hash)
409   {
410     croak("No Docker image hash found from locator $docker_locator");
411   }
412   $docker_stream =~ s/^\.//;
413   my $docker_install_script = qq{
414 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
415     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
416 fi
417 };
418   my $docker_pid = fork();
419   if ($docker_pid == 0)
420   {
421     srun (["srun", "--nodelist=" . join(',', @node)],
422           ["/bin/sh", "-ec", $docker_install_script]);
423     exit ($?);
424   }
425   while (1)
426   {
427     last if $docker_pid == waitpid (-1, WNOHANG);
428     freeze_if_want_freeze ($docker_pid);
429     select (undef, undef, undef, 0.1);
430   }
431   if ($? != 0)
432   {
433     croak("Installing Docker image from $docker_locator exited "
434           .exit_status_s($?));
435   }
436
437   # Determine whether this version of Docker supports memory+swap limits.
438   srun(["srun", "--nodelist=" . $node[0]],
439        ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
440       {fork => 1});
441   $docker_limitmem = ($? == 0);
442
443   if ($Job->{arvados_sdk_version}) {
444     # The job also specifies an Arvados SDK version.  Add the SDKs to the
445     # tar file for the build script to install.
446     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
447                        $Job->{arvados_sdk_version}));
448     add_git_archive("git", "--git-dir=$git_dir", "archive",
449                     "--prefix=.arvados.sdk/",
450                     $Job->{arvados_sdk_version}, "sdk");
451   }
452 }
453
454 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
455   # If script_version looks like an absolute path, *and* the --git-dir
456   # argument was not given -- which implies we were not invoked by
457   # crunch-dispatch -- we will use the given path as a working
458   # directory instead of resolving script_version to a git commit (or
459   # doing anything else with git).
460   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
461   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
462 }
463 else {
464   # Resolve the given script_version to a git commit sha1. Also, if
465   # the repository is remote, clone it into our local filesystem: this
466   # ensures "git archive" will work, and is necessary to reliably
467   # resolve a symbolic script_version like "master^".
468   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
469
470   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
471
472   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
473
474   # If we're running under crunch-dispatch, it will have already
475   # pulled the appropriate source tree into its own repository, and
476   # given us that repo's path as $git_dir.
477   #
478   # If we're running a "local" job, we might have to fetch content
479   # from a remote repository.
480   #
481   # (Currently crunch-dispatch gives a local path with --git-dir, but
482   # we might as well accept URLs there too in case it changes its
483   # mind.)
484   my $repo = $git_dir || $Job->{'repository'};
485
486   # Repository can be remote or local. If remote, we'll need to fetch it
487   # to a local dir before doing `git log` et al.
488   my $repo_location;
489
490   if ($repo =~ m{://|^[^/]*:}) {
491     # $repo is a git url we can clone, like git:// or https:// or
492     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
493     # not recognized here because distinguishing that from a local
494     # path is too fragile. If you really need something strange here,
495     # use the ssh:// form.
496     $repo_location = 'remote';
497   } elsif ($repo =~ m{^\.*/}) {
498     # $repo is a local path to a git index. We'll also resolve ../foo
499     # to ../foo/.git if the latter is a directory. To help
500     # disambiguate local paths from named hosted repositories, this
501     # form must be given as ./ or ../ if it's a relative path.
502     if (-d "$repo/.git") {
503       $repo = "$repo/.git";
504     }
505     $repo_location = 'local';
506   } else {
507     # $repo is none of the above. It must be the name of a hosted
508     # repository.
509     my $arv_repo_list = api_call("repositories/list",
510                                  'filters' => [['name','=',$repo]]);
511     my @repos_found = @{$arv_repo_list->{'items'}};
512     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
513     if ($n_found > 0) {
514       Log(undef, "Repository '$repo' -> "
515           . join(", ", map { $_->{'uuid'} } @repos_found));
516     }
517     if ($n_found != 1) {
518       croak("Error: Found $n_found repositories with name '$repo'.");
519     }
520     $repo = $repos_found[0]->{'fetch_url'};
521     $repo_location = 'remote';
522   }
523   Log(undef, "Using $repo_location repository '$repo'");
524   $ENV{"CRUNCH_SRC_URL"} = $repo;
525
526   # Resolve given script_version (we'll call that $treeish here) to a
527   # commit sha1 ($commit).
528   my $treeish = $Job->{'script_version'};
529   my $commit;
530   if ($repo_location eq 'remote') {
531     # We minimize excess object-fetching by re-using the same bare
532     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
533     # just keep adding remotes to it as needed.
534     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
535     my $gitcmd = "git --git-dir=\Q$local_repo\E";
536
537     # Set up our local repo for caching remote objects, making
538     # archives, etc.
539     if (!-d $local_repo) {
540       make_path($local_repo) or croak("Error: could not create $local_repo");
541     }
542     # This works (exits 0 and doesn't delete fetched objects) even
543     # if $local_repo is already initialized:
544     `$gitcmd init --bare`;
545     if ($?) {
546       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
547     }
548
549     # If $treeish looks like a hash (or abbrev hash) we look it up in
550     # our local cache first, since that's cheaper. (We don't want to
551     # do that with tags/branches though -- those change over time, so
552     # they should always be resolved by the remote repo.)
553     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
554       # Hide stderr because it's normal for this to fail:
555       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
556       if ($? == 0 &&
557           # Careful not to resolve a branch named abcdeff to commit 1234567:
558           $sha1 =~ /^$treeish/ &&
559           $sha1 =~ /^([0-9a-f]{40})$/s) {
560         $commit = $1;
561         Log(undef, "Commit $commit already present in $local_repo");
562       }
563     }
564
565     if (!defined $commit) {
566       # If $treeish isn't just a hash or abbrev hash, or isn't here
567       # yet, we need to fetch the remote to resolve it correctly.
568
569       # First, remove all local heads. This prevents a name that does
570       # not exist on the remote from resolving to (or colliding with)
571       # a previously fetched branch or tag (possibly from a different
572       # remote).
573       remove_tree("$local_repo/refs/heads", {keep_root => 1});
574
575       Log(undef, "Fetching objects from $repo to $local_repo");
576       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
577       if ($?) {
578         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
579       }
580     }
581
582     # Now that the data is all here, we will use our local repo for
583     # the rest of our git activities.
584     $repo = $local_repo;
585   }
586
587   my $gitcmd = "git --git-dir=\Q$repo\E";
588   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
589   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
590     croak("`$gitcmd rev-list` exited "
591           .exit_status_s($?)
592           .", '$treeish' not found. Giving up.");
593   }
594   $commit = $1;
595   Log(undef, "Version $treeish is commit $commit");
596
597   if ($commit ne $Job->{'script_version'}) {
598     # Record the real commit id in the database, frozentokey, logs,
599     # etc. -- instead of an abbreviation or a branch name which can
600     # become ambiguous or point to a different commit in the future.
601     if (!$Job->update_attributes('script_version' => $commit)) {
602       croak("Error: failed to update job's script_version attribute");
603     }
604   }
605
606   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
607   add_git_archive("$gitcmd archive ''\Q$commit\E");
608 }
609
610 my $git_archive = combined_git_archive();
611 if (!defined $git_archive) {
612   Log(undef, "Skip install phase (no git archive)");
613   if ($have_slurm) {
614     Log(undef, "Warning: This probably means workers have no source tree!");
615   }
616 }
617 else {
618   my $install_exited;
619   my $install_script_tries_left = 3;
620   for (my $attempts = 0; $attempts < 3; $attempts++) {
621     Log(undef, "Run install script on all workers");
622
623     my @srunargs = ("srun",
624                     "--nodelist=$nodelist",
625                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
626     my @execargs = ("sh", "-c",
627                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
628
629     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
630     my ($install_stderr_r, $install_stderr_w);
631     pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
632     set_nonblocking($install_stderr_r);
633     my $installpid = fork();
634     if ($installpid == 0)
635     {
636       close($install_stderr_r);
637       fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
638       open(STDOUT, ">&", $install_stderr_w);
639       open(STDERR, ">&", $install_stderr_w);
640       srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
641       exit (1);
642     }
643     close($install_stderr_w);
644     # Tell freeze_if_want_freeze how to kill the child, otherwise the
645     # "waitpid(installpid)" loop won't get interrupted by a freeze:
646     $proc{$installpid} = {};
647     my $stderr_buf = '';
648     # Track whether anything appears on stderr other than slurm errors
649     # ("srun: ...") and the "starting: ..." message printed by the
650     # srun subroutine itself:
651     my $stderr_anything_from_script = 0;
652     my $match_our_own_errors = '^(srun: error: |starting: \[)';
653     while ($installpid != waitpid(-1, WNOHANG)) {
654       freeze_if_want_freeze ($installpid);
655       # Wait up to 0.1 seconds for something to appear on stderr, then
656       # do a non-blocking read.
657       my $bits = fhbits($install_stderr_r);
658       select ($bits, undef, $bits, 0.1);
659       if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
660       {
661         while ($stderr_buf =~ /^(.*?)\n/) {
662           my $line = $1;
663           substr $stderr_buf, 0, 1+length($line), "";
664           Log(undef, "stderr $line");
665           if ($line !~ /$match_our_own_errors/) {
666             $stderr_anything_from_script = 1;
667           }
668         }
669       }
670     }
671     delete $proc{$installpid};
672     $install_exited = $?;
673     close($install_stderr_r);
674     if (length($stderr_buf) > 0) {
675       if ($stderr_buf !~ /$match_our_own_errors/) {
676         $stderr_anything_from_script = 1;
677       }
678       Log(undef, "stderr $stderr_buf")
679     }
680
681     Log (undef, "Install script exited ".exit_status_s($install_exited));
682     last if $install_exited == 0 || $main::please_freeze;
683     # If the install script fails but doesn't print an error message,
684     # the next thing anyone is likely to do is just run it again in
685     # case it was a transient problem like "slurm communication fails
686     # because the network isn't reliable enough". So we'll just do
687     # that ourselves (up to 3 attempts in total). OTOH, if there is an
688     # error message, the problem is more likely to have a real fix and
689     # we should fail the job so the fixing process can start, instead
690     # of doing 2 more attempts.
691     last if $stderr_anything_from_script;
692   }
693
694   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
695     unlink($tar_filename);
696   }
697
698   if ($install_exited != 0) {
699     croak("Giving up");
700   }
701 }
702
703 foreach (qw (script script_version script_parameters runtime_constraints))
704 {
705   Log (undef,
706        "$_ " .
707        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
708 }
709 foreach (split (/\n/, $Job->{knobs}))
710 {
711   Log (undef, "knob " . $_);
712 }
713
714
715
716 $main::success = undef;
717
718
719
720 ONELEVEL:
721
722 my $thisround_succeeded = 0;
723 my $thisround_failed = 0;
724 my $thisround_failed_multiple = 0;
725
726 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
727                        or $a <=> $b } @jobstep_todo;
728 my $level = $jobstep[$jobstep_todo[0]]->{level};
729
730 my $initial_tasks_this_level = 0;
731 foreach my $id (@jobstep_todo) {
732   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
733 }
734
735 # If the number of tasks scheduled at this level #T is smaller than the number
736 # of slots available #S, only use the first #T slots, or the first slot on
737 # each node, whichever number is greater.
738 #
739 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
740 # based on these numbers.  Using fewer slots makes more resources available
741 # to each individual task, which should normally be a better strategy when
742 # there are fewer of them running with less parallelism.
743 #
744 # Note that this calculation is not redone if the initial tasks at
745 # this level queue more tasks at the same level.  This may harm
746 # overall task throughput for that level.
747 my @freeslot;
748 if ($initial_tasks_this_level < @node) {
749   @freeslot = (0..$#node);
750 } elsif ($initial_tasks_this_level < @slot) {
751   @freeslot = (0..$initial_tasks_this_level - 1);
752 } else {
753   @freeslot = (0..$#slot);
754 }
755 my $round_num_freeslots = scalar(@freeslot);
756
757 my %round_max_slots = ();
758 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
759   my $this_slot = $slot[$freeslot[$ii]];
760   my $node_name = $this_slot->{node}->{name};
761   $round_max_slots{$node_name} ||= $this_slot->{cpu};
762   last if (scalar(keys(%round_max_slots)) >= @node);
763 }
764
765 Log(undef, "start level $level with $round_num_freeslots slots");
766 my @holdslot;
767 my %reader;
768 my $progress_is_dirty = 1;
769 my $progress_stats_updated = 0;
770
771 update_progress_stats();
772
773
774 THISROUND:
775 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
776 {
777   my $id = $jobstep_todo[$todo_ptr];
778   my $Jobstep = $jobstep[$id];
779   if ($Jobstep->{level} != $level)
780   {
781     next;
782   }
783
784   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
785   set_nonblocking($reader{$id});
786
787   my $childslot = $freeslot[0];
788   my $childnode = $slot[$childslot]->{node};
789   my $childslotname = join (".",
790                             $slot[$childslot]->{node}->{name},
791                             $slot[$childslot]->{cpu});
792
793   my $childpid = fork();
794   if ($childpid == 0)
795   {
796     $SIG{'INT'} = 'DEFAULT';
797     $SIG{'QUIT'} = 'DEFAULT';
798     $SIG{'TERM'} = 'DEFAULT';
799
800     foreach (values (%reader))
801     {
802       close($_);
803     }
804     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
805     open(STDOUT,">&writer");
806     open(STDERR,">&writer");
807
808     undef $dbh;
809     undef $sth;
810
811     delete $ENV{"GNUPGHOME"};
812     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
813     $ENV{"TASK_QSEQUENCE"} = $id;
814     $ENV{"TASK_SEQUENCE"} = $level;
815     $ENV{"JOB_SCRIPT"} = $Job->{script};
816     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
817       $param =~ tr/a-z/A-Z/;
818       $ENV{"JOB_PARAMETER_$param"} = $value;
819     }
820     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
821     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
822     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
823     $ENV{"HOME"} = $ENV{"TASK_WORK"};
824     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
825     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
826     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
827     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
828
829     $ENV{"GZIP"} = "-n";
830
831     my @srunargs = (
832       "srun",
833       "--nodelist=".$childnode->{name},
834       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
835       "--job-name=$job_id.$id.$$",
836         );
837     my $command =
838         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
839         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
840         ."&& cd $ENV{CRUNCH_TMP} "
841         # These environment variables get used explicitly later in
842         # $command.  No tool is expected to read these values directly.
843         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
844         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
845         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
846         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
847     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
848     if ($docker_hash)
849     {
850       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
851       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
852       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
853       # We only set memory limits if Docker lets us limit both memory and swap.
854       # Memory limits alone have been supported longer, but subprocesses tend
855       # to get SIGKILL if they exceed that without any swap limit set.
856       # See #5642 for additional background.
857       if ($docker_limitmem) {
858         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
859       }
860
861       # Dynamically configure the container to use the host system as its
862       # DNS server.  Get the host's global addresses from the ip command,
863       # and turn them into docker --dns options using gawk.
864       $command .=
865           q{$(ip -o address show scope global |
866               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
867
868       # The source tree and $destdir directory (which we have
869       # installed on the worker host) are available in the container,
870       # under the same path.
871       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
872       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
873
874       # Currently, we make arv-mount's mount point appear at /keep
875       # inside the container (instead of using the same path as the
876       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
877       # crunch scripts and utilities must not rely on this. They must
878       # use $TASK_KEEPMOUNT.
879       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
880       $ENV{TASK_KEEPMOUNT} = "/keep";
881
882       # TASK_WORK is almost exactly like a docker data volume: it
883       # starts out empty, is writable, and persists until no
884       # containers use it any more. We don't use --volumes-from to
885       # share it with other containers: it is only accessible to this
886       # task, and it goes away when this task stops.
887       #
888       # However, a docker data volume is writable only by root unless
889       # the mount point already happens to exist in the container with
890       # different permissions. Therefore, we [1] assume /tmp already
891       # exists in the image and is writable by the crunch user; [2]
892       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
893       # writable if they are created by docker while setting up the
894       # other --volumes); and [3] create $TASK_WORK inside the
895       # container using $build_script.
896       $command .= "--volume=/tmp ";
897       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
898       $ENV{"HOME"} = $ENV{"TASK_WORK"};
899       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
900
901       # TODO: Share a single JOB_WORK volume across all task
902       # containers on a given worker node, and delete it when the job
903       # ends (and, in case that doesn't work, when the next job
904       # starts).
905       #
906       # For now, use the same approach as TASK_WORK above.
907       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
908
909       while (my ($env_key, $env_val) = each %ENV)
910       {
911         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
912           $command .= "--env=\Q$env_key=$env_val\E ";
913         }
914       }
915       $command .= "--env=\QHOME=$ENV{HOME}\E ";
916       $command .= "\Q$docker_hash\E ";
917       $command .= "stdbuf --output=0 --error=0 ";
918       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
919     } else {
920       # Non-docker run
921       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
922       $command .= "stdbuf --output=0 --error=0 ";
923       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
924     }
925
926     my @execargs = ('bash', '-c', $command);
927     srun (\@srunargs, \@execargs, undef, $build_script);
928     # exec() failed, we assume nothing happened.
929     die "srun() failed on build script\n";
930   }
931   close("writer");
932   if (!defined $childpid)
933   {
934     close $reader{$id};
935     delete $reader{$id};
936     next;
937   }
938   shift @freeslot;
939   $proc{$childpid} = { jobstep => $id,
940                        time => time,
941                        slot => $childslot,
942                        jobstepname => "$job_id.$id.$childpid",
943                      };
944   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
945   $slot[$childslot]->{pid} = $childpid;
946
947   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
948   Log ($id, "child $childpid started on $childslotname");
949   $Jobstep->{starttime} = time;
950   $Jobstep->{node} = $childnode->{name};
951   $Jobstep->{slotindex} = $childslot;
952   delete $Jobstep->{stderr};
953   delete $Jobstep->{finishtime};
954
955   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
956   $Jobstep->{'arvados_task'}->save;
957
958   splice @jobstep_todo, $todo_ptr, 1;
959   --$todo_ptr;
960
961   $progress_is_dirty = 1;
962
963   while (!@freeslot
964          ||
965          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
966   {
967     last THISROUND if $main::please_freeze || defined($main::success);
968     if ($main::please_info)
969     {
970       $main::please_info = 0;
971       freeze();
972       create_output_collection();
973       save_meta(1);
974       update_progress_stats();
975     }
976     my $gotsome
977         = readfrompipes ()
978         + reapchildren ();
979     if (!$gotsome)
980     {
981       check_refresh_wanted();
982       check_squeue();
983       update_progress_stats();
984       select (undef, undef, undef, 0.1);
985     }
986     elsif (time - $progress_stats_updated >= 30)
987     {
988       update_progress_stats();
989     }
990     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
991         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
992     {
993       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
994           .($thisround_failed+$thisround_succeeded)
995           .") -- giving up on this round";
996       Log (undef, $message);
997       last THISROUND;
998     }
999
1000     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1001     for (my $i=$#freeslot; $i>=0; $i--) {
1002       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1003         push @holdslot, (splice @freeslot, $i, 1);
1004       }
1005     }
1006     for (my $i=$#holdslot; $i>=0; $i--) {
1007       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1008         push @freeslot, (splice @holdslot, $i, 1);
1009       }
1010     }
1011
1012     # give up if no nodes are succeeding
1013     if (!grep { $_->{node}->{losing_streak} == 0 &&
1014                     $_->{node}->{hold_count} < 4 } @slot) {
1015       my $message = "Every node has failed -- giving up on this round";
1016       Log (undef, $message);
1017       last THISROUND;
1018     }
1019   }
1020 }
1021
1022
1023 push @freeslot, splice @holdslot;
1024 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1025
1026
1027 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1028 while (%proc)
1029 {
1030   if ($main::please_continue) {
1031     $main::please_continue = 0;
1032     goto THISROUND;
1033   }
1034   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1035   readfrompipes ();
1036   if (!reapchildren())
1037   {
1038     check_refresh_wanted();
1039     check_squeue();
1040     update_progress_stats();
1041     select (undef, undef, undef, 0.1);
1042     killem (keys %proc) if $main::please_freeze;
1043   }
1044 }
1045
1046 update_progress_stats();
1047 freeze_if_want_freeze();
1048
1049
1050 if (!defined $main::success)
1051 {
1052   if (@jobstep_todo &&
1053       $thisround_succeeded == 0 &&
1054       ($thisround_failed == 0 || $thisround_failed > 4))
1055   {
1056     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1057     Log (undef, $message);
1058     $main::success = 0;
1059   }
1060   if (!@jobstep_todo)
1061   {
1062     $main::success = 1;
1063   }
1064 }
1065
1066 goto ONELEVEL if !defined $main::success;
1067
1068
1069 release_allocation();
1070 freeze();
1071 my $collated_output = &create_output_collection();
1072
1073 if (!$collated_output) {
1074   Log (undef, "Failed to write output collection");
1075 }
1076 else {
1077   Log(undef, "job output $collated_output");
1078   $Job->update_attributes('output' => $collated_output);
1079 }
1080
1081 Log (undef, "finish");
1082
1083 save_meta();
1084
1085 my $final_state;
1086 if ($collated_output && $main::success) {
1087   $final_state = 'Complete';
1088 } else {
1089   $final_state = 'Failed';
1090 }
1091 $Job->update_attributes('state' => $final_state);
1092
1093 exit (($final_state eq 'Complete') ? 0 : 1);
1094
1095
1096
1097 sub update_progress_stats
1098 {
1099   $progress_stats_updated = time;
1100   return if !$progress_is_dirty;
1101   my ($todo, $done, $running) = (scalar @jobstep_todo,
1102                                  scalar @jobstep_done,
1103                                  scalar @slot - scalar @freeslot - scalar @holdslot);
1104   $Job->{'tasks_summary'} ||= {};
1105   $Job->{'tasks_summary'}->{'todo'} = $todo;
1106   $Job->{'tasks_summary'}->{'done'} = $done;
1107   $Job->{'tasks_summary'}->{'running'} = $running;
1108   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1109   Log (undef, "status: $done done, $running running, $todo todo");
1110   $progress_is_dirty = 0;
1111 }
1112
1113
1114
1115 sub reapchildren
1116 {
1117   my $pid = waitpid (-1, WNOHANG);
1118   return 0 if $pid <= 0;
1119
1120   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1121                   . "."
1122                   . $slot[$proc{$pid}->{slot}]->{cpu});
1123   my $jobstepid = $proc{$pid}->{jobstep};
1124   my $elapsed = time - $proc{$pid}->{time};
1125   my $Jobstep = $jobstep[$jobstepid];
1126
1127   my $childstatus = $?;
1128   my $exitvalue = $childstatus >> 8;
1129   my $exitinfo = "exit ".exit_status_s($childstatus);
1130   $Jobstep->{'arvados_task'}->reload;
1131   my $task_success = $Jobstep->{'arvados_task'}->{success};
1132
1133   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1134
1135   if (!defined $task_success) {
1136     # task did not indicate one way or the other --> fail
1137     $Jobstep->{'arvados_task'}->{success} = 0;
1138     $Jobstep->{'arvados_task'}->save;
1139     $task_success = 0;
1140   }
1141
1142   if (!$task_success)
1143   {
1144     my $temporary_fail;
1145     $temporary_fail ||= $Jobstep->{node_fail};
1146     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1147
1148     ++$thisround_failed;
1149     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1150
1151     # Check for signs of a failed or misconfigured node
1152     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1153         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1154       # Don't count this against jobstep failure thresholds if this
1155       # node is already suspected faulty and srun exited quickly
1156       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1157           $elapsed < 5) {
1158         Log ($jobstepid, "blaming failure on suspect node " .
1159              $slot[$proc{$pid}->{slot}]->{node}->{name});
1160         $temporary_fail ||= 1;
1161       }
1162       ban_node_by_slot($proc{$pid}->{slot});
1163     }
1164
1165     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1166                              ++$Jobstep->{'failures'},
1167                              $temporary_fail ? 'temporary' : 'permanent',
1168                              $elapsed));
1169
1170     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1171       # Give up on this task, and the whole job
1172       $main::success = 0;
1173     }
1174     # Put this task back on the todo queue
1175     push @jobstep_todo, $jobstepid;
1176     $Job->{'tasks_summary'}->{'failed'}++;
1177   }
1178   else
1179   {
1180     ++$thisround_succeeded;
1181     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1182     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1183     push @jobstep_done, $jobstepid;
1184     Log ($jobstepid, "success in $elapsed seconds");
1185   }
1186   $Jobstep->{exitcode} = $childstatus;
1187   $Jobstep->{finishtime} = time;
1188   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1189   $Jobstep->{'arvados_task'}->save;
1190   process_stderr ($jobstepid, $task_success);
1191   Log ($jobstepid, sprintf("task output (%d bytes): %s",
1192                            length($Jobstep->{'arvados_task'}->{output}),
1193                            $Jobstep->{'arvados_task'}->{output}));
1194
1195   close $reader{$jobstepid};
1196   delete $reader{$jobstepid};
1197   delete $slot[$proc{$pid}->{slot}]->{pid};
1198   push @freeslot, $proc{$pid}->{slot};
1199   delete $proc{$pid};
1200
1201   if ($task_success) {
1202     # Load new tasks
1203     my $newtask_list = [];
1204     my $newtask_results;
1205     do {
1206       $newtask_results = api_call(
1207         "job_tasks/list",
1208         'where' => {
1209           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1210         },
1211         'order' => 'qsequence',
1212         'offset' => scalar(@$newtask_list),
1213       );
1214       push(@$newtask_list, @{$newtask_results->{items}});
1215     } while (@{$newtask_results->{items}});
1216     foreach my $arvados_task (@$newtask_list) {
1217       my $jobstep = {
1218         'level' => $arvados_task->{'sequence'},
1219         'failures' => 0,
1220         'arvados_task' => $arvados_task
1221       };
1222       push @jobstep, $jobstep;
1223       push @jobstep_todo, $#jobstep;
1224     }
1225   }
1226
1227   $progress_is_dirty = 1;
1228   1;
1229 }
1230
1231 sub check_refresh_wanted
1232 {
1233   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1234   if (@stat && $stat[9] > $latest_refresh) {
1235     $latest_refresh = scalar time;
1236     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1237     for my $attr ('cancelled_at',
1238                   'cancelled_by_user_uuid',
1239                   'cancelled_by_client_uuid',
1240                   'state') {
1241       $Job->{$attr} = $Job2->{$attr};
1242     }
1243     if ($Job->{'state'} ne "Running") {
1244       if ($Job->{'state'} eq "Cancelled") {
1245         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1246       } else {
1247         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1248       }
1249       $main::success = 0;
1250       $main::please_freeze = 1;
1251     }
1252   }
1253 }
1254
1255 sub check_squeue
1256 {
1257   # return if the kill list was checked <4 seconds ago
1258   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1259   {
1260     return;
1261   }
1262   $squeue_kill_checked = time;
1263
1264   # use killem() on procs whose killtime is reached
1265   for (keys %proc)
1266   {
1267     if (exists $proc{$_}->{killtime}
1268         && $proc{$_}->{killtime} <= time)
1269     {
1270       killem ($_);
1271     }
1272   }
1273
1274   # return if the squeue was checked <60 seconds ago
1275   if (defined $squeue_checked && $squeue_checked > time - 60)
1276   {
1277     return;
1278   }
1279   $squeue_checked = time;
1280
1281   if (!$have_slurm)
1282   {
1283     # here is an opportunity to check for mysterious problems with local procs
1284     return;
1285   }
1286
1287   # get a list of steps still running
1288   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1289   chop @squeue;
1290   if ($squeue[-1] ne "ok")
1291   {
1292     return;
1293   }
1294   pop @squeue;
1295
1296   # which of my jobsteps are running, according to squeue?
1297   my %ok;
1298   foreach (@squeue)
1299   {
1300     if (/^(\d+)\.(\d+) (\S+)/)
1301     {
1302       if ($1 eq $ENV{SLURM_JOBID})
1303       {
1304         $ok{$3} = 1;
1305       }
1306     }
1307   }
1308
1309   # which of my active child procs (>60s old) were not mentioned by squeue?
1310   foreach (keys %proc)
1311   {
1312     if ($proc{$_}->{time} < time - 60
1313         && !exists $ok{$proc{$_}->{jobstepname}}
1314         && !exists $proc{$_}->{killtime})
1315     {
1316       # kill this proc if it hasn't exited in 30 seconds
1317       $proc{$_}->{killtime} = time + 30;
1318     }
1319   }
1320 }
1321
1322
1323 sub release_allocation
1324 {
1325   if ($have_slurm)
1326   {
1327     Log (undef, "release job allocation");
1328     system "scancel $ENV{SLURM_JOBID}";
1329   }
1330 }
1331
1332
1333 sub readfrompipes
1334 {
1335   my $gotsome = 0;
1336   foreach my $job (keys %reader)
1337   {
1338     my $buf;
1339     while (0 < sysread ($reader{$job}, $buf, 8192))
1340     {
1341       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1342       $jobstep[$job]->{stderr} .= $buf;
1343       preprocess_stderr ($job);
1344       if (length ($jobstep[$job]->{stderr}) > 16384)
1345       {
1346         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1347       }
1348       $gotsome = 1;
1349     }
1350   }
1351   return $gotsome;
1352 }
1353
1354
1355 sub preprocess_stderr
1356 {
1357   my $job = shift;
1358
1359   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1360     my $line = $1;
1361     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1362     Log ($job, "stderr $line");
1363     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1364       # whoa.
1365       $main::please_freeze = 1;
1366     }
1367     elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
1368       $jobstep[$job]->{node_fail} = 1;
1369       ban_node_by_slot($jobstep[$job]->{slotindex});
1370     }
1371   }
1372 }
1373
1374
1375 sub process_stderr
1376 {
1377   my $job = shift;
1378   my $task_success = shift;
1379   preprocess_stderr ($job);
1380
1381   map {
1382     Log ($job, "stderr $_");
1383   } split ("\n", $jobstep[$job]->{stderr});
1384 }
1385
1386 sub fetch_block
1387 {
1388   my $hash = shift;
1389   my $keep;
1390   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1391     Log(undef, "fetch_block run error from arv-get $hash: $!");
1392     return undef;
1393   }
1394   my $output_block = "";
1395   while (1) {
1396     my $buf;
1397     my $bytes = sysread($keep, $buf, 1024 * 1024);
1398     if (!defined $bytes) {
1399       Log(undef, "fetch_block read error from arv-get: $!");
1400       $output_block = undef;
1401       last;
1402     } elsif ($bytes == 0) {
1403       # sysread returns 0 at the end of the pipe.
1404       last;
1405     } else {
1406       # some bytes were read into buf.
1407       $output_block .= $buf;
1408     }
1409   }
1410   close $keep;
1411   if ($?) {
1412     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1413     $output_block = undef;
1414   }
1415   return $output_block;
1416 }
1417
1418 # Create a collection by concatenating the output of all tasks (each
1419 # task's output is either a manifest fragment, a locator for a
1420 # manifest fragment stored in Keep, or nothing at all). Return the
1421 # portable_data_hash of the new collection.
1422 sub create_output_collection
1423 {
1424   Log (undef, "collate");
1425
1426   my ($child_out, $child_in);
1427   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1428 import arvados
1429 import sys
1430 print (arvados.api("v1").collections().
1431        create(body={"manifest_text": sys.stdin.read()}).
1432        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1433 }, retry_count());
1434
1435   my $task_idx = -1;
1436   my $manifest_size = 0;
1437   for (@jobstep)
1438   {
1439     ++$task_idx;
1440     my $output = $_->{'arvados_task'}->{output};
1441     next if (!defined($output));
1442     my $next_write;
1443     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1444       $next_write = fetch_block($output);
1445     } else {
1446       $next_write = $output;
1447     }
1448     if (defined($next_write)) {
1449       if (!defined(syswrite($child_in, $next_write))) {
1450         # There's been an error writing.  Stop the loop.
1451         # We'll log details about the exit code later.
1452         last;
1453       } else {
1454         $manifest_size += length($next_write);
1455       }
1456     } else {
1457       my $uuid = $_->{'arvados_task'}->{'uuid'};
1458       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1459       $main::success = 0;
1460     }
1461   }
1462   close($child_in);
1463   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1464
1465   my $joboutput;
1466   my $s = IO::Select->new($child_out);
1467   if ($s->can_read(120)) {
1468     sysread($child_out, $joboutput, 1024 * 1024);
1469     waitpid($pid, 0);
1470     if ($?) {
1471       Log(undef, "output collection creation exited " . exit_status_s($?));
1472       $joboutput = undef;
1473     } else {
1474       chomp($joboutput);
1475     }
1476   } else {
1477     Log (undef, "timed out while creating output collection");
1478     foreach my $signal (2, 2, 2, 15, 15, 9) {
1479       kill($signal, $pid);
1480       last if waitpid($pid, WNOHANG) == -1;
1481       sleep(1);
1482     }
1483   }
1484   close($child_out);
1485
1486   return $joboutput;
1487 }
1488
1489
1490 sub killem
1491 {
1492   foreach (@_)
1493   {
1494     my $sig = 2;                # SIGINT first
1495     if (exists $proc{$_}->{"sent_$sig"} &&
1496         time - $proc{$_}->{"sent_$sig"} > 4)
1497     {
1498       $sig = 15;                # SIGTERM if SIGINT doesn't work
1499     }
1500     if (exists $proc{$_}->{"sent_$sig"} &&
1501         time - $proc{$_}->{"sent_$sig"} > 4)
1502     {
1503       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1504     }
1505     if (!exists $proc{$_}->{"sent_$sig"})
1506     {
1507       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1508       kill $sig, $_;
1509       select (undef, undef, undef, 0.1);
1510       if ($sig == 2)
1511       {
1512         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1513       }
1514       $proc{$_}->{"sent_$sig"} = time;
1515       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1516     }
1517   }
1518 }
1519
1520
1521 sub fhbits
1522 {
1523   my($bits);
1524   for (@_) {
1525     vec($bits,fileno($_),1) = 1;
1526   }
1527   $bits;
1528 }
1529
1530
1531 # Send log output to Keep via arv-put.
1532 #
1533 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1534 # $log_pipe_pid is the pid of the arv-put subprocess.
1535 #
1536 # The only functions that should access these variables directly are:
1537 #
1538 # log_writer_start($logfilename)
1539 #     Starts an arv-put pipe, reading data on stdin and writing it to
1540 #     a $logfilename file in an output collection.
1541 #
1542 # log_writer_send($txt)
1543 #     Writes $txt to the output log collection.
1544 #
1545 # log_writer_finish()
1546 #     Closes the arv-put pipe and returns the output that it produces.
1547 #
1548 # log_writer_is_active()
1549 #     Returns a true value if there is currently a live arv-put
1550 #     process, false otherwise.
1551 #
1552 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1553
1554 sub log_writer_start($)
1555 {
1556   my $logfilename = shift;
1557   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1558                         'arv-put',
1559                         '--portable-data-hash',
1560                         '--project-uuid', $Job->{owner_uuid},
1561                         '--retries', '3',
1562                         '--name', $logfilename,
1563                         '--filename', $logfilename,
1564                         '-');
1565 }
1566
1567 sub log_writer_send($)
1568 {
1569   my $txt = shift;
1570   print $log_pipe_in $txt;
1571 }
1572
1573 sub log_writer_finish()
1574 {
1575   return unless $log_pipe_pid;
1576
1577   close($log_pipe_in);
1578   my $arv_put_output;
1579
1580   my $s = IO::Select->new($log_pipe_out);
1581   if ($s->can_read(120)) {
1582     sysread($log_pipe_out, $arv_put_output, 1024);
1583     chomp($arv_put_output);
1584   } else {
1585     Log (undef, "timed out reading from 'arv-put'");
1586   }
1587
1588   waitpid($log_pipe_pid, 0);
1589   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1590   if ($?) {
1591     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1592   }
1593
1594   return $arv_put_output;
1595 }
1596
1597 sub log_writer_is_active() {
1598   return $log_pipe_pid;
1599 }
1600
1601 sub Log                         # ($jobstep_id, $logmessage)
1602 {
1603   if ($_[1] =~ /\n/) {
1604     for my $line (split (/\n/, $_[1])) {
1605       Log ($_[0], $line);
1606     }
1607     return;
1608   }
1609   my $fh = select STDERR; $|=1; select $fh;
1610   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1611   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1612   $message .= "\n";
1613   my $datetime;
1614   if (log_writer_is_active() || -t STDERR) {
1615     my @gmtime = gmtime;
1616     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1617                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1618   }
1619   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1620
1621   if (log_writer_is_active()) {
1622     log_writer_send($datetime . " " . $message);
1623   }
1624 }
1625
1626
1627 sub croak
1628 {
1629   my ($package, $file, $line) = caller;
1630   my $message = "@_ at $file line $line\n";
1631   Log (undef, $message);
1632   freeze() if @jobstep_todo;
1633   create_output_collection() if @jobstep_todo;
1634   cleanup();
1635   save_meta();
1636   die;
1637 }
1638
1639
1640 sub cleanup
1641 {
1642   return unless $Job;
1643   if ($Job->{'state'} eq 'Cancelled') {
1644     $Job->update_attributes('finished_at' => scalar gmtime);
1645   } else {
1646     $Job->update_attributes('state' => 'Failed');
1647   }
1648 }
1649
1650
1651 sub save_meta
1652 {
1653   my $justcheckpoint = shift; # false if this will be the last meta saved
1654   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1655   return unless log_writer_is_active();
1656
1657   my $loglocator = log_writer_finish();
1658   Log (undef, "log manifest is $loglocator");
1659   $Job->{'log'} = $loglocator;
1660   $Job->update_attributes('log', $loglocator);
1661 }
1662
1663
1664 sub freeze_if_want_freeze
1665 {
1666   if ($main::please_freeze)
1667   {
1668     release_allocation();
1669     if (@_)
1670     {
1671       # kill some srun procs before freeze+stop
1672       map { $proc{$_} = {} } @_;
1673       while (%proc)
1674       {
1675         killem (keys %proc);
1676         select (undef, undef, undef, 0.1);
1677         my $died;
1678         while (($died = waitpid (-1, WNOHANG)) > 0)
1679         {
1680           delete $proc{$died};
1681         }
1682       }
1683     }
1684     freeze();
1685     create_output_collection();
1686     cleanup();
1687     save_meta();
1688     exit 1;
1689   }
1690 }
1691
1692
1693 sub freeze
1694 {
1695   Log (undef, "Freeze not implemented");
1696   return;
1697 }
1698
1699
1700 sub thaw
1701 {
1702   croak ("Thaw not implemented");
1703 }
1704
1705
1706 sub freezequote
1707 {
1708   my $s = shift;
1709   $s =~ s/\\/\\\\/g;
1710   $s =~ s/\n/\\n/g;
1711   return $s;
1712 }
1713
1714
1715 sub freezeunquote
1716 {
1717   my $s = shift;
1718   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1719   return $s;
1720 }
1721
1722
1723 sub srun
1724 {
1725   my $srunargs = shift;
1726   my $execargs = shift;
1727   my $opts = shift || {};
1728   my $stdin = shift;
1729   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1730
1731   $Data::Dumper::Terse = 1;
1732   $Data::Dumper::Indent = 0;
1733   my $show_cmd = Dumper($args);
1734   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1735   $show_cmd =~ s/\n/ /g;
1736   if ($opts->{fork}) {
1737     Log(undef, "starting: $show_cmd");
1738   } else {
1739     # This is a child process: parent is in charge of reading our
1740     # stderr and copying it to Log() if needed.
1741     warn "starting: $show_cmd\n";
1742   }
1743
1744   if (defined $stdin) {
1745     my $child = open STDIN, "-|";
1746     defined $child or die "no fork: $!";
1747     if ($child == 0) {
1748       print $stdin or die $!;
1749       close STDOUT or die $!;
1750       exit 0;
1751     }
1752   }
1753
1754   return system (@$args) if $opts->{fork};
1755
1756   exec @$args;
1757   warn "ENV size is ".length(join(" ",%ENV));
1758   die "exec failed: $!: @$args";
1759 }
1760
1761
1762 sub ban_node_by_slot {
1763   # Don't start any new jobsteps on this node for 60 seconds
1764   my $slotid = shift;
1765   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1766   $slot[$slotid]->{node}->{hold_count}++;
1767   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1768 }
1769
1770 sub must_lock_now
1771 {
1772   my ($lockfile, $error_message) = @_;
1773   open L, ">", $lockfile or croak("$lockfile: $!");
1774   if (!flock L, LOCK_EX|LOCK_NB) {
1775     croak("Can't lock $lockfile: $error_message\n");
1776   }
1777 }
1778
1779 sub find_docker_image {
1780   # Given a Keep locator, check to see if it contains a Docker image.
1781   # If so, return its stream name and Docker hash.
1782   # If not, return undef for both values.
1783   my $locator = shift;
1784   my ($streamname, $filename);
1785   my $image = api_call("collections/get", uuid => $locator);
1786   if ($image) {
1787     foreach my $line (split(/\n/, $image->{manifest_text})) {
1788       my @tokens = split(/\s+/, $line);
1789       next if (!@tokens);
1790       $streamname = shift(@tokens);
1791       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1792         if (defined($filename)) {
1793           return (undef, undef);  # More than one file in the Collection.
1794         } else {
1795           $filename = (split(/:/, $filedata, 3))[2];
1796         }
1797       }
1798     }
1799   }
1800   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1801     return ($streamname, $1);
1802   } else {
1803     return (undef, undef);
1804   }
1805 }
1806
1807 sub retry_count {
1808   # Calculate the number of times an operation should be retried,
1809   # assuming exponential backoff, and that we're willing to retry as
1810   # long as tasks have been running.  Enforce a minimum of 3 retries.
1811   my ($starttime, $endtime, $timediff, $retries);
1812   if (@jobstep) {
1813     $starttime = $jobstep[0]->{starttime};
1814     $endtime = $jobstep[-1]->{finishtime};
1815   }
1816   if (!defined($starttime)) {
1817     $timediff = 0;
1818   } elsif (!defined($endtime)) {
1819     $timediff = time - $starttime;
1820   } else {
1821     $timediff = ($endtime - $starttime) - (time - $endtime);
1822   }
1823   if ($timediff > 0) {
1824     $retries = int(log($timediff) / log(2));
1825   } else {
1826     $retries = 1;  # Use the minimum.
1827   }
1828   return ($retries > 3) ? $retries : 3;
1829 }
1830
1831 sub retry_op {
1832   # Pass in two function references.
1833   # This method will be called with the remaining arguments.
1834   # If it dies, retry it with exponential backoff until it succeeds,
1835   # or until the current retry_count is exhausted.  After each failure
1836   # that can be retried, the second function will be called with
1837   # the current try count (0-based), next try time, and error message.
1838   my $operation = shift;
1839   my $retry_callback = shift;
1840   my $retries = retry_count();
1841   foreach my $try_count (0..$retries) {
1842     my $next_try = time + (2 ** $try_count);
1843     my $result = eval { $operation->(@_); };
1844     if (!$@) {
1845       return $result;
1846     } elsif ($try_count < $retries) {
1847       $retry_callback->($try_count, $next_try, $@);
1848       my $sleep_time = $next_try - time;
1849       sleep($sleep_time) if ($sleep_time > 0);
1850     }
1851   }
1852   # Ensure the error message ends in a newline, so Perl doesn't add
1853   # retry_op's line number to it.
1854   chomp($@);
1855   die($@ . "\n");
1856 }
1857
1858 sub api_call {
1859   # Pass in a /-separated API method name, and arguments for it.
1860   # This function will call that method, retrying as needed until
1861   # the current retry_count is exhausted, with a log on the first failure.
1862   my $method_name = shift;
1863   my $log_api_retry = sub {
1864     my ($try_count, $next_try_at, $errmsg) = @_;
1865     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1866     $errmsg =~ s/\s/ /g;
1867     $errmsg =~ s/\s+$//;
1868     my $retry_msg;
1869     if ($next_try_at < time) {
1870       $retry_msg = "Retrying.";
1871     } else {
1872       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1873       $retry_msg = "Retrying at $next_try_fmt.";
1874     }
1875     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1876   };
1877   my $method = $arv;
1878   foreach my $key (split(/\//, $method_name)) {
1879     $method = $method->{$key};
1880   }
1881   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1882 }
1883
1884 sub exit_status_s {
1885   # Given a $?, return a human-readable exit code string like "0" or
1886   # "1" or "0 with signal 1" or "1 with signal 11".
1887   my $exitcode = shift;
1888   my $s = $exitcode >> 8;
1889   if ($exitcode & 0x7f) {
1890     $s .= " with signal " . ($exitcode & 0x7f);
1891   }
1892   if ($exitcode & 0x80) {
1893     $s .= " with core dump";
1894   }
1895   return $s;
1896 }
1897
1898 sub handle_readall {
1899   # Pass in a glob reference to a file handle.
1900   # Read all its contents and return them as a string.
1901   my $fh_glob_ref = shift;
1902   local $/ = undef;
1903   return <$fh_glob_ref>;
1904 }
1905
1906 sub tar_filename_n {
1907   my $n = shift;
1908   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1909 }
1910
1911 sub add_git_archive {
1912   # Pass in a git archive command as a string or list, a la system().
1913   # This method will save its output to be included in the archive sent to the
1914   # build script.
1915   my $git_input;
1916   $git_tar_count++;
1917   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1918     croak("Failed to save git archive: $!");
1919   }
1920   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1921   close($git_input);
1922   waitpid($git_pid, 0);
1923   close(GIT_ARCHIVE);
1924   if ($?) {
1925     croak("Failed to save git archive: git exited " . exit_status_s($?));
1926   }
1927 }
1928
1929 sub combined_git_archive {
1930   # Combine all saved tar archives into a single archive, then return its
1931   # contents in a string.  Return undef if no archives have been saved.
1932   if ($git_tar_count < 1) {
1933     return undef;
1934   }
1935   my $base_tar_name = tar_filename_n(1);
1936   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1937     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1938     if ($tar_exit != 0) {
1939       croak("Error preparing build archive: tar -A exited " .
1940             exit_status_s($tar_exit));
1941     }
1942   }
1943   if (!open(GIT_TAR, "<", $base_tar_name)) {
1944     croak("Could not open build archive: $!");
1945   }
1946   my $tar_contents = handle_readall(\*GIT_TAR);
1947   close(GIT_TAR);
1948   return $tar_contents;
1949 }
1950
1951 sub set_nonblocking {
1952   my $fh = shift;
1953   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
1954   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
1955 }
1956
1957 __DATA__
1958 #!/usr/bin/perl
1959 #
1960 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1961 # server invokes this script on individual compute nodes, or localhost if we're
1962 # running a job locally.  It gets called in two modes:
1963 #
1964 # * No arguments: Installation mode.  Read a tar archive from the DATA
1965 #   file handle; it includes the Crunch script's source code, and
1966 #   maybe SDKs as well.  Those should be installed in the proper
1967 #   locations.  This runs outside of any Docker container, so don't try to
1968 #   introspect Crunch's runtime environment.
1969 #
1970 # * With arguments: Crunch script run mode.  This script should set up the
1971 #   environment, then run the command specified in the arguments.  This runs
1972 #   inside any Docker container.
1973
1974 use Fcntl ':flock';
1975 use File::Path qw( make_path remove_tree );
1976 use POSIX qw(getcwd);
1977
1978 use constant TASK_TEMPFAIL => 111;
1979
1980 # Map SDK subdirectories to the path environments they belong to.
1981 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1982
1983 my $destdir = $ENV{"CRUNCH_SRC"};
1984 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
1985 my $repo = $ENV{"CRUNCH_SRC_URL"};
1986 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1987 my $job_work = $ENV{"JOB_WORK"};
1988 my $task_work = $ENV{"TASK_WORK"};
1989
1990 open(STDOUT_ORIG, ">&", STDOUT);
1991 open(STDERR_ORIG, ">&", STDERR);
1992
1993 for my $dir ($destdir, $job_work, $task_work) {
1994   if ($dir) {
1995     make_path $dir;
1996     -e $dir or die "Failed to create temporary directory ($dir): $!";
1997   }
1998 }
1999
2000 if ($task_work) {
2001   remove_tree($task_work, {keep_root => 1});
2002 }
2003
2004 ### Crunch script run mode
2005 if (@ARGV) {
2006   # We want to do routine logging during task 0 only.  This gives the user
2007   # the information they need, but avoids repeating the information for every
2008   # task.
2009   my $Log;
2010   if ($ENV{TASK_SEQUENCE} eq "0") {
2011     $Log = sub {
2012       my $msg = shift;
2013       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2014     };
2015   } else {
2016     $Log = sub { };
2017   }
2018
2019   my $python_src = "$install_dir/python";
2020   my $venv_dir = "$job_work/.arvados.venv";
2021   my $venv_built = -e "$venv_dir/bin/activate";
2022   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2023     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2024                  "--python=python2.7", $venv_dir);
2025     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2026     $venv_built = 1;
2027     $Log->("Built Python SDK virtualenv");
2028   }
2029
2030   my $pip_bin = "pip";
2031   if ($venv_built) {
2032     $Log->("Running in Python SDK virtualenv");
2033     $pip_bin = "$venv_dir/bin/pip";
2034     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2035     @ARGV = ("/bin/sh", "-ec",
2036              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2037   } elsif (-d $python_src) {
2038     $Log->("Warning: virtualenv not found inside Docker container default " .
2039            "\$PATH. Can't install Python SDK.");
2040   }
2041
2042   my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2043   if ($pkgs) {
2044     $Log->("Using Arvados SDK:");
2045     foreach my $line (split /\n/, $pkgs) {
2046       $Log->($line);
2047     }
2048   } else {
2049     $Log->("Arvados SDK packages not found");
2050   }
2051
2052   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2053     my $sdk_path = "$install_dir/$sdk_dir";
2054     if (-d $sdk_path) {
2055       if ($ENV{$sdk_envkey}) {
2056         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2057       } else {
2058         $ENV{$sdk_envkey} = $sdk_path;
2059       }
2060       $Log->("Arvados SDK added to %s", $sdk_envkey);
2061     }
2062   }
2063
2064   exec(@ARGV);
2065   die "Cannot exec `@ARGV`: $!";
2066 }
2067
2068 ### Installation mode
2069 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2070 flock L, LOCK_EX;
2071 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2072   # This exact git archive (source + arvados sdk) is already installed
2073   # here, so there's no need to reinstall it.
2074
2075   # We must consume our DATA section, though: otherwise the process
2076   # feeding it to us will get SIGPIPE.
2077   my $buf;
2078   while (read(DATA, $buf, 65536)) { }
2079
2080   exit(0);
2081 }
2082
2083 unlink "$destdir.archive_hash";
2084 mkdir $destdir;
2085
2086 do {
2087   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2088   local $SIG{PIPE} = "IGNORE";
2089   warn "Extracting archive: $archive_hash\n";
2090   # --ignore-zeros is necessary sometimes: depending on how much NUL
2091   # padding tar -A put on our combined archive (which in turn depends
2092   # on the length of the component archives) tar without
2093   # --ignore-zeros will exit before consuming stdin and cause close()
2094   # to fail on the resulting SIGPIPE.
2095   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2096     die "Error launching 'tar -xC $destdir': $!";
2097   }
2098   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2099   # get SIGPIPE.  We must feed it data incrementally.
2100   my $tar_input;
2101   while (read(DATA, $tar_input, 65536)) {
2102     print TARX $tar_input;
2103   }
2104   if(!close(TARX)) {
2105     die "'tar -xC $destdir' exited $?: $!";
2106   }
2107 };
2108
2109 mkdir $install_dir;
2110
2111 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2112 if (-d $sdk_root) {
2113   foreach my $sdk_lang (("python",
2114                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2115     if (-d "$sdk_root/$sdk_lang") {
2116       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2117         die "Failed to install $sdk_lang SDK: $!";
2118       }
2119     }
2120   }
2121 }
2122
2123 my $python_dir = "$install_dir/python";
2124 if ((-d $python_dir) and can_run("python2.7") and
2125     (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
2126   # egg_info failed, probably when it asked git for a build tag.
2127   # Specify no build tag.
2128   open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2129   print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2130   close($pysdk_cfg);
2131 }
2132
2133 # Hide messages from the install script (unless it fails: shell_or_die
2134 # will show $destdir.log in that case).
2135 open(STDOUT, ">>", "$destdir.log");
2136 open(STDERR, ">&", STDOUT);
2137
2138 if (-e "$destdir/crunch_scripts/install") {
2139     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2140 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2141     # Old version
2142     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2143 } elsif (-e "./install.sh") {
2144     shell_or_die (undef, "./install.sh", $install_dir);
2145 }
2146
2147 if ($archive_hash) {
2148     unlink "$destdir.archive_hash.new";
2149     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2150     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2151 }
2152
2153 close L;
2154
2155 sub can_run {
2156   my $command_name = shift;
2157   open(my $which, "-|", "which", $command_name);
2158   while (<$which>) { }
2159   close($which);
2160   return ($? == 0);
2161 }
2162
2163 sub shell_or_die
2164 {
2165   my $exitcode = shift;
2166
2167   if ($ENV{"DEBUG"}) {
2168     print STDERR "@_\n";
2169   }
2170   if (system (@_) != 0) {
2171     my $err = $!;
2172     my $code = $?;
2173     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2174     open STDERR, ">&STDERR_ORIG";
2175     system ("cat $destdir.log >&2");
2176     warn "@_ failed ($err): $exitstatus";
2177     if (defined($exitcode)) {
2178       exit $exitcode;
2179     }
2180     else {
2181       exit (($code >> 8) || 1);
2182     }
2183   }
2184 }
2185
2186 __DATA__