Merge branch 'master' into 4638-ssh-notification
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Data::Dumper;
90 use Digest::MD5 qw(md5_hex);
91 use Getopt::Long;
92 use IPC::Open2;
93 use IO::Select;
94 use File::Temp;
95 use Fcntl ':flock';
96 use File::Path qw( make_path remove_tree );
97
98 use constant EX_TEMPFAIL => 75;
99
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103   if ($ENV{"USER"} ne "crunch" && $< != 0) {
104     # use a tmp dir unique for my uid
105     $ENV{"CRUNCH_TMP"} .= "-$<";
106   }
107 }
108
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
112 }
113
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
118
119 my $force_unlock;
120 my $git_dir;
121 my $jobspec;
122 my $job_api_token;
123 my $no_clear_tmp;
124 my $resume_stash;
125 GetOptions('force-unlock' => \$force_unlock,
126            'git-dir=s' => \$git_dir,
127            'job=s' => \$jobspec,
128            'job-api-token=s' => \$job_api_token,
129            'no-clear-tmp' => \$no_clear_tmp,
130            'resume-stash=s' => \$resume_stash,
131     );
132
133 if (defined $job_api_token) {
134   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
135 }
136
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
138 my $local_job = 0;
139
140
141 $SIG{'USR1'} = sub
142 {
143   $main::ENV{CRUNCH_DEBUG} = 1;
144 };
145 $SIG{'USR2'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 0;
148 };
149
150
151
152 my $arv = Arvados->new('apiVersion' => 'v1');
153
154 my $Job;
155 my $job_id;
156 my $dbh;
157 my $sth;
158 my @jobstep;
159
160 my $User = api_call("users/current");
161
162 if ($jobspec =~ /^[-a-z\d]+$/)
163 {
164   # $jobspec is an Arvados UUID, not a JSON job specification
165   $Job = api_call("jobs/get", uuid => $jobspec);
166   if (!$force_unlock) {
167     # Claim this job, and make sure nobody else does
168     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = api_call("jobs/create", job => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = api_call("job_tasks/create", job_task => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323   });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339 do {
340   local $/ = undef;
341   $build_script = <DATA>;
342 };
343 my $nodelist = join(",", @node);
344
345 if (!defined $no_clear_tmp) {
346   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
347   Log (undef, "Clean work dirs");
348
349   my $cleanpid = fork();
350   if ($cleanpid == 0)
351   {
352     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
353           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
354     exit (1);
355   }
356   while (1)
357   {
358     last if $cleanpid == waitpid (-1, WNOHANG);
359     freeze_if_want_freeze ($cleanpid);
360     select (undef, undef, undef, 0.1);
361   }
362   Log (undef, "Cleanup command exited ".exit_status_s($?));
363 }
364
365
366 my $git_archive;
367 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
368   # If script_version looks like an absolute path, *and* the --git-dir
369   # argument was not given -- which implies we were not invoked by
370   # crunch-dispatch -- we will use the given path as a working
371   # directory instead of resolving script_version to a git commit (or
372   # doing anything else with git).
373   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
374   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
375 }
376 else {
377   # Resolve the given script_version to a git commit sha1. Also, if
378   # the repository is remote, clone it into our local filesystem: this
379   # ensures "git archive" will work, and is necessary to reliably
380   # resolve a symbolic script_version like "master^".
381   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
382
383   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
384
385   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
386
387   # If we're running under crunch-dispatch, it will have already
388   # pulled the appropriate source tree into its own repository, and
389   # given us that repo's path as $git_dir.
390   #
391   # If we're running a "local" job, we might have to fetch content
392   # from a remote repository.
393   #
394   # (Currently crunch-dispatch gives a local path with --git-dir, but
395   # we might as well accept URLs there too in case it changes its
396   # mind.)
397   my $repo = $git_dir || $Job->{'repository'};
398
399   # Repository can be remote or local. If remote, we'll need to fetch it
400   # to a local dir before doing `git log` et al.
401   my $repo_location;
402
403   if ($repo =~ m{://|^[^/]*:}) {
404     # $repo is a git url we can clone, like git:// or https:// or
405     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
406     # not recognized here because distinguishing that from a local
407     # path is too fragile. If you really need something strange here,
408     # use the ssh:// form.
409     $repo_location = 'remote';
410   } elsif ($repo =~ m{^\.*/}) {
411     # $repo is a local path to a git index. We'll also resolve ../foo
412     # to ../foo/.git if the latter is a directory. To help
413     # disambiguate local paths from named hosted repositories, this
414     # form must be given as ./ or ../ if it's a relative path.
415     if (-d "$repo/.git") {
416       $repo = "$repo/.git";
417     }
418     $repo_location = 'local';
419   } else {
420     # $repo is none of the above. It must be the name of a hosted
421     # repository.
422     my $arv_repo_list = api_call("repositories/list",
423                                  'filters' => [['name','=',$repo]]);
424     my @repos_found = @{$arv_repo_list->{'items'}};
425     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
426     if ($n_found > 0) {
427       Log(undef, "Repository '$repo' -> "
428           . join(", ", map { $_->{'uuid'} } @repos_found));
429     }
430     if ($n_found != 1) {
431       croak("Error: Found $n_found repositories with name '$repo'.");
432     }
433     $repo = $repos_found[0]->{'fetch_url'};
434     $repo_location = 'remote';
435   }
436   Log(undef, "Using $repo_location repository '$repo'");
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   # Resolve given script_version (we'll call that $treeish here) to a
440   # commit sha1 ($commit).
441   my $treeish = $Job->{'script_version'};
442   my $commit;
443   if ($repo_location eq 'remote') {
444     # We minimize excess object-fetching by re-using the same bare
445     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
446     # just keep adding remotes to it as needed.
447     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
448     my $gitcmd = "git --git-dir=\Q$local_repo\E";
449
450     # Set up our local repo for caching remote objects, making
451     # archives, etc.
452     if (!-d $local_repo) {
453       make_path($local_repo) or croak("Error: could not create $local_repo");
454     }
455     # This works (exits 0 and doesn't delete fetched objects) even
456     # if $local_repo is already initialized:
457     `$gitcmd init --bare`;
458     if ($?) {
459       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
460     }
461
462     # If $treeish looks like a hash (or abbrev hash) we look it up in
463     # our local cache first, since that's cheaper. (We don't want to
464     # do that with tags/branches though -- those change over time, so
465     # they should always be resolved by the remote repo.)
466     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
467       # Hide stderr because it's normal for this to fail:
468       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
469       if ($? == 0 &&
470           # Careful not to resolve a branch named abcdeff to commit 1234567:
471           $sha1 =~ /^$treeish/ &&
472           $sha1 =~ /^([0-9a-f]{40})$/s) {
473         $commit = $1;
474         Log(undef, "Commit $commit already present in $local_repo");
475       }
476     }
477
478     if (!defined $commit) {
479       # If $treeish isn't just a hash or abbrev hash, or isn't here
480       # yet, we need to fetch the remote to resolve it correctly.
481
482       # First, remove all local heads. This prevents a name that does
483       # not exist on the remote from resolving to (or colliding with)
484       # a previously fetched branch or tag (possibly from a different
485       # remote).
486       remove_tree("$local_repo/refs/heads", {keep_root => 1});
487
488       Log(undef, "Fetching objects from $repo to $local_repo");
489       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
490       if ($?) {
491         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
492       }
493     }
494
495     # Now that the data is all here, we will use our local repo for
496     # the rest of our git activities.
497     $repo = $local_repo;
498   }
499
500   my $gitcmd = "git --git-dir=\Q$repo\E";
501   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
502   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
503     croak("`$gitcmd rev-list` exited "
504           .exit_status_s($?)
505           .", '$treeish' not found. Giving up.");
506   }
507   $commit = $1;
508   Log(undef, "Version $treeish is commit $commit");
509
510   if ($commit ne $Job->{'script_version'}) {
511     # Record the real commit id in the database, frozentokey, logs,
512     # etc. -- instead of an abbreviation or a branch name which can
513     # become ambiguous or point to a different commit in the future.
514     if (!$Job->update_attributes('script_version' => $commit)) {
515       croak("Error: failed to update job's script_version attribute");
516     }
517   }
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
520   $git_archive = `$gitcmd archive ''\Q$commit\E`;
521   if ($?) {
522     croak("Error: $gitcmd archive exited ".exit_status_s($?));
523   }
524 }
525
526 if (!defined $git_archive) {
527   Log(undef, "Skip install phase (no git archive)");
528   if ($have_slurm) {
529     Log(undef, "Warning: This probably means workers have no source tree!");
530   }
531 }
532 else {
533   Log(undef, "Run install script on all workers");
534
535   my @srunargs = ("srun",
536                   "--nodelist=$nodelist",
537                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
538   my @execargs = ("sh", "-c",
539                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
540
541   my $installpid = fork();
542   if ($installpid == 0)
543   {
544     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
545     exit (1);
546   }
547   while (1)
548   {
549     last if $installpid == waitpid (-1, WNOHANG);
550     freeze_if_want_freeze ($installpid);
551     select (undef, undef, undef, 0.1);
552   }
553   my $install_exited = $?;
554   Log (undef, "Install script exited ".exit_status_s($install_exited));
555   exit (1) if $install_exited != 0;
556 }
557
558 if (!$have_slurm)
559 {
560   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
561   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
562 }
563
564 # If this job requires a Docker image, install that.
565 my $docker_bin = "/usr/bin/docker.io";
566 my ($docker_locator, $docker_stream, $docker_hash);
567 if ($docker_locator = $Job->{docker_image_locator}) {
568   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
569   if (!$docker_hash)
570   {
571     croak("No Docker image hash found from locator $docker_locator");
572   }
573   $docker_stream =~ s/^\.//;
574   my $docker_install_script = qq{
575 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
576     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
577 fi
578 };
579   my $docker_pid = fork();
580   if ($docker_pid == 0)
581   {
582     srun (["srun", "--nodelist=" . join(',', @node)],
583           ["/bin/sh", "-ec", $docker_install_script]);
584     exit ($?);
585   }
586   while (1)
587   {
588     last if $docker_pid == waitpid (-1, WNOHANG);
589     freeze_if_want_freeze ($docker_pid);
590     select (undef, undef, undef, 0.1);
591   }
592   if ($? != 0)
593   {
594     croak("Installing Docker image from $docker_locator exited "
595           .exit_status_s($?));
596   }
597 }
598
599 foreach (qw (script script_version script_parameters runtime_constraints))
600 {
601   Log (undef,
602        "$_ " .
603        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
604 }
605 foreach (split (/\n/, $Job->{knobs}))
606 {
607   Log (undef, "knob " . $_);
608 }
609
610
611
612 $main::success = undef;
613
614
615
616 ONELEVEL:
617
618 my $thisround_succeeded = 0;
619 my $thisround_failed = 0;
620 my $thisround_failed_multiple = 0;
621
622 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
623                        or $a <=> $b } @jobstep_todo;
624 my $level = $jobstep[$jobstep_todo[0]]->{level};
625 Log (undef, "start level $level");
626
627
628
629 my %proc;
630 my @freeslot = (0..$#slot);
631 my @holdslot;
632 my %reader;
633 my $progress_is_dirty = 1;
634 my $progress_stats_updated = 0;
635
636 update_progress_stats();
637
638
639
640 THISROUND:
641 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
642 {
643   my $id = $jobstep_todo[$todo_ptr];
644   my $Jobstep = $jobstep[$id];
645   if ($Jobstep->{level} != $level)
646   {
647     next;
648   }
649
650   pipe $reader{$id}, "writer" or croak ($!);
651   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
652   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
653
654   my $childslot = $freeslot[0];
655   my $childnode = $slot[$childslot]->{node};
656   my $childslotname = join (".",
657                             $slot[$childslot]->{node}->{name},
658                             $slot[$childslot]->{cpu});
659   my $childpid = fork();
660   if ($childpid == 0)
661   {
662     $SIG{'INT'} = 'DEFAULT';
663     $SIG{'QUIT'} = 'DEFAULT';
664     $SIG{'TERM'} = 'DEFAULT';
665
666     foreach (values (%reader))
667     {
668       close($_);
669     }
670     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
671     open(STDOUT,">&writer");
672     open(STDERR,">&writer");
673
674     undef $dbh;
675     undef $sth;
676
677     delete $ENV{"GNUPGHOME"};
678     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
679     $ENV{"TASK_QSEQUENCE"} = $id;
680     $ENV{"TASK_SEQUENCE"} = $level;
681     $ENV{"JOB_SCRIPT"} = $Job->{script};
682     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
683       $param =~ tr/a-z/A-Z/;
684       $ENV{"JOB_PARAMETER_$param"} = $value;
685     }
686     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
687     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
688     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
689     $ENV{"HOME"} = $ENV{"TASK_WORK"};
690     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
691     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
692     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
693     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
694
695     $ENV{"GZIP"} = "-n";
696
697     my @srunargs = (
698       "srun",
699       "--nodelist=".$childnode->{name},
700       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
701       "--job-name=$job_id.$id.$$",
702         );
703     my $command =
704         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
705         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
706         ."&& cd $ENV{CRUNCH_TMP} ";
707     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
708     if ($docker_hash)
709     {
710       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
711       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
712       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
713
714       # Dynamically configure the container to use the host system as its
715       # DNS server.  Get the host's global addresses from the ip command,
716       # and turn them into docker --dns options using gawk.
717       $command .=
718           q{$(ip -o address show scope global |
719               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
720
721       # The source tree and $destdir directory (which we have
722       # installed on the worker host) are available in the container,
723       # under the same path.
724       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
725       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
726
727       # Currently, we make arv-mount's mount point appear at /keep
728       # inside the container (instead of using the same path as the
729       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
730       # crunch scripts and utilities must not rely on this. They must
731       # use $TASK_KEEPMOUNT.
732       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
733       $ENV{TASK_KEEPMOUNT} = "/keep";
734
735       # TASK_WORK is almost exactly like a docker data volume: it
736       # starts out empty, is writable, and persists until no
737       # containers use it any more. We don't use --volumes-from to
738       # share it with other containers: it is only accessible to this
739       # task, and it goes away when this task stops.
740       #
741       # However, a docker data volume is writable only by root unless
742       # the mount point already happens to exist in the container with
743       # different permissions. Therefore, we [1] assume /tmp already
744       # exists in the image and is writable by the crunch user; [2]
745       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
746       # writable if they are created by docker while setting up the
747       # other --volumes); and [3] create $TASK_WORK inside the
748       # container using $build_script.
749       $command .= "--volume=/tmp ";
750       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
751       $ENV{"HOME"} = $ENV{"TASK_WORK"};
752       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
753
754       # TODO: Share a single JOB_WORK volume across all task
755       # containers on a given worker node, and delete it when the job
756       # ends (and, in case that doesn't work, when the next job
757       # starts).
758       #
759       # For now, use the same approach as TASK_WORK above.
760       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
761
762       while (my ($env_key, $env_val) = each %ENV)
763       {
764         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
765           $command .= "--env=\Q$env_key=$env_val\E ";
766         }
767       }
768       $command .= "--env=\QHOME=$ENV{HOME}\E ";
769       $command .= "\Q$docker_hash\E ";
770       $command .= "stdbuf --output=0 --error=0 ";
771       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
772     } else {
773       # Non-docker run
774       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
775       $command .= "stdbuf --output=0 --error=0 ";
776       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
777     }
778
779     my @execargs = ('bash', '-c', $command);
780     srun (\@srunargs, \@execargs, undef, $build_script);
781     # exec() failed, we assume nothing happened.
782     die "srun() failed on build script\n";
783   }
784   close("writer");
785   if (!defined $childpid)
786   {
787     close $reader{$id};
788     delete $reader{$id};
789     next;
790   }
791   shift @freeslot;
792   $proc{$childpid} = { jobstep => $id,
793                        time => time,
794                        slot => $childslot,
795                        jobstepname => "$job_id.$id.$childpid",
796                      };
797   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
798   $slot[$childslot]->{pid} = $childpid;
799
800   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
801   Log ($id, "child $childpid started on $childslotname");
802   $Jobstep->{starttime} = time;
803   $Jobstep->{node} = $childnode->{name};
804   $Jobstep->{slotindex} = $childslot;
805   delete $Jobstep->{stderr};
806   delete $Jobstep->{finishtime};
807
808   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
809   $Jobstep->{'arvados_task'}->save;
810
811   splice @jobstep_todo, $todo_ptr, 1;
812   --$todo_ptr;
813
814   $progress_is_dirty = 1;
815
816   while (!@freeslot
817          ||
818          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
819   {
820     last THISROUND if $main::please_freeze;
821     if ($main::please_info)
822     {
823       $main::please_info = 0;
824       freeze();
825       create_output_collection();
826       save_meta(1);
827       update_progress_stats();
828     }
829     my $gotsome
830         = readfrompipes ()
831         + reapchildren ();
832     if (!$gotsome)
833     {
834       check_refresh_wanted();
835       check_squeue();
836       update_progress_stats();
837       select (undef, undef, undef, 0.1);
838     }
839     elsif (time - $progress_stats_updated >= 30)
840     {
841       update_progress_stats();
842     }
843     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
844         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
845     {
846       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
847           .($thisround_failed+$thisround_succeeded)
848           .") -- giving up on this round";
849       Log (undef, $message);
850       last THISROUND;
851     }
852
853     # move slots from freeslot to holdslot (or back to freeslot) if necessary
854     for (my $i=$#freeslot; $i>=0; $i--) {
855       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
856         push @holdslot, (splice @freeslot, $i, 1);
857       }
858     }
859     for (my $i=$#holdslot; $i>=0; $i--) {
860       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
861         push @freeslot, (splice @holdslot, $i, 1);
862       }
863     }
864
865     # give up if no nodes are succeeding
866     if (!grep { $_->{node}->{losing_streak} == 0 &&
867                     $_->{node}->{hold_count} < 4 } @slot) {
868       my $message = "Every node has failed -- giving up on this round";
869       Log (undef, $message);
870       last THISROUND;
871     }
872   }
873 }
874
875
876 push @freeslot, splice @holdslot;
877 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
878
879
880 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
881 while (%proc)
882 {
883   if ($main::please_continue) {
884     $main::please_continue = 0;
885     goto THISROUND;
886   }
887   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
888   readfrompipes ();
889   if (!reapchildren())
890   {
891     check_refresh_wanted();
892     check_squeue();
893     update_progress_stats();
894     select (undef, undef, undef, 0.1);
895     killem (keys %proc) if $main::please_freeze;
896   }
897 }
898
899 update_progress_stats();
900 freeze_if_want_freeze();
901
902
903 if (!defined $main::success)
904 {
905   if (@jobstep_todo &&
906       $thisround_succeeded == 0 &&
907       ($thisround_failed == 0 || $thisround_failed > 4))
908   {
909     my $message = "stop because $thisround_failed tasks failed and none succeeded";
910     Log (undef, $message);
911     $main::success = 0;
912   }
913   if (!@jobstep_todo)
914   {
915     $main::success = 1;
916   }
917 }
918
919 goto ONELEVEL if !defined $main::success;
920
921
922 release_allocation();
923 freeze();
924 my $collated_output = &create_output_collection();
925
926 if (!$collated_output) {
927   Log (undef, "Failed to write output collection");
928 }
929 else {
930   Log(undef, "output hash " . $collated_output);
931   $Job->update_attributes('output' => $collated_output);
932 }
933
934 Log (undef, "finish");
935
936 save_meta();
937
938 my $final_state;
939 if ($collated_output && $main::success) {
940   $final_state = 'Complete';
941 } else {
942   $final_state = 'Failed';
943 }
944 $Job->update_attributes('state' => $final_state);
945
946 exit (($final_state eq 'Complete') ? 0 : 1);
947
948
949
950 sub update_progress_stats
951 {
952   $progress_stats_updated = time;
953   return if !$progress_is_dirty;
954   my ($todo, $done, $running) = (scalar @jobstep_todo,
955                                  scalar @jobstep_done,
956                                  scalar @slot - scalar @freeslot - scalar @holdslot);
957   $Job->{'tasks_summary'} ||= {};
958   $Job->{'tasks_summary'}->{'todo'} = $todo;
959   $Job->{'tasks_summary'}->{'done'} = $done;
960   $Job->{'tasks_summary'}->{'running'} = $running;
961   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
962   Log (undef, "status: $done done, $running running, $todo todo");
963   $progress_is_dirty = 0;
964 }
965
966
967
968 sub reapchildren
969 {
970   my $pid = waitpid (-1, WNOHANG);
971   return 0 if $pid <= 0;
972
973   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
974                   . "."
975                   . $slot[$proc{$pid}->{slot}]->{cpu});
976   my $jobstepid = $proc{$pid}->{jobstep};
977   my $elapsed = time - $proc{$pid}->{time};
978   my $Jobstep = $jobstep[$jobstepid];
979
980   my $childstatus = $?;
981   my $exitvalue = $childstatus >> 8;
982   my $exitinfo = "exit ".exit_status_s($childstatus);
983   $Jobstep->{'arvados_task'}->reload;
984   my $task_success = $Jobstep->{'arvados_task'}->{success};
985
986   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
987
988   if (!defined $task_success) {
989     # task did not indicate one way or the other --> fail
990     $Jobstep->{'arvados_task'}->{success} = 0;
991     $Jobstep->{'arvados_task'}->save;
992     $task_success = 0;
993   }
994
995   if (!$task_success)
996   {
997     my $temporary_fail;
998     $temporary_fail ||= $Jobstep->{node_fail};
999     $temporary_fail ||= ($exitvalue == 111);
1000
1001     ++$thisround_failed;
1002     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1003
1004     # Check for signs of a failed or misconfigured node
1005     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1006         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1007       # Don't count this against jobstep failure thresholds if this
1008       # node is already suspected faulty and srun exited quickly
1009       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1010           $elapsed < 5) {
1011         Log ($jobstepid, "blaming failure on suspect node " .
1012              $slot[$proc{$pid}->{slot}]->{node}->{name});
1013         $temporary_fail ||= 1;
1014       }
1015       ban_node_by_slot($proc{$pid}->{slot});
1016     }
1017
1018     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1019                              ++$Jobstep->{'failures'},
1020                              $temporary_fail ? 'temporary ' : 'permanent',
1021                              $elapsed));
1022
1023     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1024       # Give up on this task, and the whole job
1025       $main::success = 0;
1026       $main::please_freeze = 1;
1027     }
1028     # Put this task back on the todo queue
1029     push @jobstep_todo, $jobstepid;
1030     $Job->{'tasks_summary'}->{'failed'}++;
1031   }
1032   else
1033   {
1034     ++$thisround_succeeded;
1035     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1036     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1037     push @jobstep_done, $jobstepid;
1038     Log ($jobstepid, "success in $elapsed seconds");
1039   }
1040   $Jobstep->{exitcode} = $childstatus;
1041   $Jobstep->{finishtime} = time;
1042   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1043   $Jobstep->{'arvados_task'}->save;
1044   process_stderr ($jobstepid, $task_success);
1045   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1046
1047   close $reader{$jobstepid};
1048   delete $reader{$jobstepid};
1049   delete $slot[$proc{$pid}->{slot}]->{pid};
1050   push @freeslot, $proc{$pid}->{slot};
1051   delete $proc{$pid};
1052
1053   if ($task_success) {
1054     # Load new tasks
1055     my $newtask_list = [];
1056     my $newtask_results;
1057     do {
1058       $newtask_results = api_call(
1059         "job_tasks/list",
1060         'where' => {
1061           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1062         },
1063         'order' => 'qsequence',
1064         'offset' => scalar(@$newtask_list),
1065       );
1066       push(@$newtask_list, @{$newtask_results->{items}});
1067     } while (@{$newtask_results->{items}});
1068     foreach my $arvados_task (@$newtask_list) {
1069       my $jobstep = {
1070         'level' => $arvados_task->{'sequence'},
1071         'failures' => 0,
1072         'arvados_task' => $arvados_task
1073       };
1074       push @jobstep, $jobstep;
1075       push @jobstep_todo, $#jobstep;
1076     }
1077   }
1078
1079   $progress_is_dirty = 1;
1080   1;
1081 }
1082
1083 sub check_refresh_wanted
1084 {
1085   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1086   if (@stat && $stat[9] > $latest_refresh) {
1087     $latest_refresh = scalar time;
1088     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1089     for my $attr ('cancelled_at',
1090                   'cancelled_by_user_uuid',
1091                   'cancelled_by_client_uuid',
1092                   'state') {
1093       $Job->{$attr} = $Job2->{$attr};
1094     }
1095     if ($Job->{'state'} ne "Running") {
1096       if ($Job->{'state'} eq "Cancelled") {
1097         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1098       } else {
1099         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1100       }
1101       $main::success = 0;
1102       $main::please_freeze = 1;
1103     }
1104   }
1105 }
1106
1107 sub check_squeue
1108 {
1109   # return if the kill list was checked <4 seconds ago
1110   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1111   {
1112     return;
1113   }
1114   $squeue_kill_checked = time;
1115
1116   # use killem() on procs whose killtime is reached
1117   for (keys %proc)
1118   {
1119     if (exists $proc{$_}->{killtime}
1120         && $proc{$_}->{killtime} <= time)
1121     {
1122       killem ($_);
1123     }
1124   }
1125
1126   # return if the squeue was checked <60 seconds ago
1127   if (defined $squeue_checked && $squeue_checked > time - 60)
1128   {
1129     return;
1130   }
1131   $squeue_checked = time;
1132
1133   if (!$have_slurm)
1134   {
1135     # here is an opportunity to check for mysterious problems with local procs
1136     return;
1137   }
1138
1139   # get a list of steps still running
1140   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1141   chop @squeue;
1142   if ($squeue[-1] ne "ok")
1143   {
1144     return;
1145   }
1146   pop @squeue;
1147
1148   # which of my jobsteps are running, according to squeue?
1149   my %ok;
1150   foreach (@squeue)
1151   {
1152     if (/^(\d+)\.(\d+) (\S+)/)
1153     {
1154       if ($1 eq $ENV{SLURM_JOBID})
1155       {
1156         $ok{$3} = 1;
1157       }
1158     }
1159   }
1160
1161   # which of my active child procs (>60s old) were not mentioned by squeue?
1162   foreach (keys %proc)
1163   {
1164     if ($proc{$_}->{time} < time - 60
1165         && !exists $ok{$proc{$_}->{jobstepname}}
1166         && !exists $proc{$_}->{killtime})
1167     {
1168       # kill this proc if it hasn't exited in 30 seconds
1169       $proc{$_}->{killtime} = time + 30;
1170     }
1171   }
1172 }
1173
1174
1175 sub release_allocation
1176 {
1177   if ($have_slurm)
1178   {
1179     Log (undef, "release job allocation");
1180     system "scancel $ENV{SLURM_JOBID}";
1181   }
1182 }
1183
1184
1185 sub readfrompipes
1186 {
1187   my $gotsome = 0;
1188   foreach my $job (keys %reader)
1189   {
1190     my $buf;
1191     while (0 < sysread ($reader{$job}, $buf, 8192))
1192     {
1193       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1194       $jobstep[$job]->{stderr} .= $buf;
1195       preprocess_stderr ($job);
1196       if (length ($jobstep[$job]->{stderr}) > 16384)
1197       {
1198         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1199       }
1200       $gotsome = 1;
1201     }
1202   }
1203   return $gotsome;
1204 }
1205
1206
1207 sub preprocess_stderr
1208 {
1209   my $job = shift;
1210
1211   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1212     my $line = $1;
1213     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1214     Log ($job, "stderr $line");
1215     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1216       # whoa.
1217       $main::please_freeze = 1;
1218     }
1219     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1220       $jobstep[$job]->{node_fail} = 1;
1221       ban_node_by_slot($jobstep[$job]->{slotindex});
1222     }
1223   }
1224 }
1225
1226
1227 sub process_stderr
1228 {
1229   my $job = shift;
1230   my $task_success = shift;
1231   preprocess_stderr ($job);
1232
1233   map {
1234     Log ($job, "stderr $_");
1235   } split ("\n", $jobstep[$job]->{stderr});
1236 }
1237
1238 sub fetch_block
1239 {
1240   my $hash = shift;
1241   my ($keep, $child_out, $output_block);
1242
1243   my $cmd = "arv-get \Q$hash\E";
1244   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1245   $output_block = '';
1246   while (1) {
1247     my $buf;
1248     my $bytes = sysread($keep, $buf, 1024 * 1024);
1249     if (!defined $bytes) {
1250       die "reading from arv-get: $!";
1251     } elsif ($bytes == 0) {
1252       # sysread returns 0 at the end of the pipe.
1253       last;
1254     } else {
1255       # some bytes were read into buf.
1256       $output_block .= $buf;
1257     }
1258   }
1259   close $keep;
1260   return $output_block;
1261 }
1262
1263 # create_output_collections generates a new collection containing the
1264 # output of each successfully completed task, and returns the
1265 # portable_data_hash for the new collection.
1266 #
1267 sub create_output_collection
1268 {
1269   Log (undef, "collate");
1270
1271   my ($child_out, $child_in);
1272   my $pid = open2($child_out, $child_in, 'python', '-c',
1273                   'import arvados; ' .
1274                   'import sys; ' .
1275                   'print arvados.api()' .
1276                   '.collections()' .
1277                   '.create(body={"manifest_text":sys.stdin.read()})' .
1278                   '.execute()["portable_data_hash"]'
1279       );
1280
1281   for (@jobstep)
1282   {
1283     next if (!exists $_->{'arvados_task'}->{'output'} ||
1284              !$_->{'arvados_task'}->{'success'});
1285     my $output = $_->{'arvados_task'}->{output};
1286     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1287     {
1288       print $child_in $output;
1289     }
1290     elsif (defined (my $outblock = fetch_block ($output)))
1291     {
1292       print $child_in $outblock;
1293     }
1294     else
1295     {
1296       Log (undef, "XXX fetch_block($output) failed XXX");
1297       $main::success = 0;
1298     }
1299   }
1300   $child_in->close;
1301
1302   my $joboutput;
1303   my $s = IO::Select->new($child_out);
1304   if ($s->can_read(120)) {
1305     sysread($child_out, $joboutput, 64 * 1024 * 1024);
1306     chomp($joboutput);
1307     # TODO: Ensure exit status == 0.
1308   } else {
1309     Log (undef, "timed out while creating output collection");
1310   }
1311   # TODO: kill $pid instead of waiting, now that we've decided to
1312   # ignore further output.
1313   waitpid($pid, 0);
1314
1315   return $joboutput;
1316 }
1317
1318
1319 sub killem
1320 {
1321   foreach (@_)
1322   {
1323     my $sig = 2;                # SIGINT first
1324     if (exists $proc{$_}->{"sent_$sig"} &&
1325         time - $proc{$_}->{"sent_$sig"} > 4)
1326     {
1327       $sig = 15;                # SIGTERM if SIGINT doesn't work
1328     }
1329     if (exists $proc{$_}->{"sent_$sig"} &&
1330         time - $proc{$_}->{"sent_$sig"} > 4)
1331     {
1332       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1333     }
1334     if (!exists $proc{$_}->{"sent_$sig"})
1335     {
1336       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1337       kill $sig, $_;
1338       select (undef, undef, undef, 0.1);
1339       if ($sig == 2)
1340       {
1341         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1342       }
1343       $proc{$_}->{"sent_$sig"} = time;
1344       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1345     }
1346   }
1347 }
1348
1349
1350 sub fhbits
1351 {
1352   my($bits);
1353   for (@_) {
1354     vec($bits,fileno($_),1) = 1;
1355   }
1356   $bits;
1357 }
1358
1359
1360 # Send log output to Keep via arv-put.
1361 #
1362 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1363 # $log_pipe_pid is the pid of the arv-put subprocess.
1364 #
1365 # The only functions that should access these variables directly are:
1366 #
1367 # log_writer_start($logfilename)
1368 #     Starts an arv-put pipe, reading data on stdin and writing it to
1369 #     a $logfilename file in an output collection.
1370 #
1371 # log_writer_send($txt)
1372 #     Writes $txt to the output log collection.
1373 #
1374 # log_writer_finish()
1375 #     Closes the arv-put pipe and returns the output that it produces.
1376 #
1377 # log_writer_is_active()
1378 #     Returns a true value if there is currently a live arv-put
1379 #     process, false otherwise.
1380 #
1381 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1382
1383 sub log_writer_start($)
1384 {
1385   my $logfilename = shift;
1386   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1387                         'arv-put', '--portable-data-hash',
1388                         '--retries', '3',
1389                         '--filename', $logfilename,
1390                         '-');
1391 }
1392
1393 sub log_writer_send($)
1394 {
1395   my $txt = shift;
1396   print $log_pipe_in $txt;
1397 }
1398
1399 sub log_writer_finish()
1400 {
1401   return unless $log_pipe_pid;
1402
1403   close($log_pipe_in);
1404   my $arv_put_output;
1405
1406   my $s = IO::Select->new($log_pipe_out);
1407   if ($s->can_read(120)) {
1408     sysread($log_pipe_out, $arv_put_output, 1024);
1409     chomp($arv_put_output);
1410   } else {
1411     Log (undef, "timed out reading from 'arv-put'");
1412   }
1413
1414   waitpid($log_pipe_pid, 0);
1415   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1416   if ($?) {
1417     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1418   }
1419
1420   return $arv_put_output;
1421 }
1422
1423 sub log_writer_is_active() {
1424   return $log_pipe_pid;
1425 }
1426
1427 sub Log                         # ($jobstep_id, $logmessage)
1428 {
1429   if ($_[1] =~ /\n/) {
1430     for my $line (split (/\n/, $_[1])) {
1431       Log ($_[0], $line);
1432     }
1433     return;
1434   }
1435   my $fh = select STDERR; $|=1; select $fh;
1436   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1437   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1438   $message .= "\n";
1439   my $datetime;
1440   if (log_writer_is_active() || -t STDERR) {
1441     my @gmtime = gmtime;
1442     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1443                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1444   }
1445   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1446
1447   if (log_writer_is_active()) {
1448     log_writer_send($datetime . " " . $message);
1449   }
1450 }
1451
1452
1453 sub croak
1454 {
1455   my ($package, $file, $line) = caller;
1456   my $message = "@_ at $file line $line\n";
1457   Log (undef, $message);
1458   freeze() if @jobstep_todo;
1459   create_output_collection() if @jobstep_todo;
1460   cleanup();
1461   save_meta();
1462   die;
1463 }
1464
1465
1466 sub cleanup
1467 {
1468   return unless $Job;
1469   if ($Job->{'state'} eq 'Cancelled') {
1470     $Job->update_attributes('finished_at' => scalar gmtime);
1471   } else {
1472     $Job->update_attributes('state' => 'Failed');
1473   }
1474 }
1475
1476
1477 sub save_meta
1478 {
1479   my $justcheckpoint = shift; # false if this will be the last meta saved
1480   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1481   return unless log_writer_is_active();
1482
1483   my $loglocator = log_writer_finish();
1484   Log (undef, "log manifest is $loglocator");
1485   $Job->{'log'} = $loglocator;
1486   $Job->update_attributes('log', $loglocator);
1487 }
1488
1489
1490 sub freeze_if_want_freeze
1491 {
1492   if ($main::please_freeze)
1493   {
1494     release_allocation();
1495     if (@_)
1496     {
1497       # kill some srun procs before freeze+stop
1498       map { $proc{$_} = {} } @_;
1499       while (%proc)
1500       {
1501         killem (keys %proc);
1502         select (undef, undef, undef, 0.1);
1503         my $died;
1504         while (($died = waitpid (-1, WNOHANG)) > 0)
1505         {
1506           delete $proc{$died};
1507         }
1508       }
1509     }
1510     freeze();
1511     create_output_collection();
1512     cleanup();
1513     save_meta();
1514     exit 1;
1515   }
1516 }
1517
1518
1519 sub freeze
1520 {
1521   Log (undef, "Freeze not implemented");
1522   return;
1523 }
1524
1525
1526 sub thaw
1527 {
1528   croak ("Thaw not implemented");
1529 }
1530
1531
1532 sub freezequote
1533 {
1534   my $s = shift;
1535   $s =~ s/\\/\\\\/g;
1536   $s =~ s/\n/\\n/g;
1537   return $s;
1538 }
1539
1540
1541 sub freezeunquote
1542 {
1543   my $s = shift;
1544   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1545   return $s;
1546 }
1547
1548
1549 sub srun
1550 {
1551   my $srunargs = shift;
1552   my $execargs = shift;
1553   my $opts = shift || {};
1554   my $stdin = shift;
1555   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1556
1557   $Data::Dumper::Terse = 1;
1558   $Data::Dumper::Indent = 0;
1559   my $show_cmd = Dumper($args);
1560   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1561   $show_cmd =~ s/\n/ /g;
1562   warn "starting: $show_cmd\n";
1563
1564   if (defined $stdin) {
1565     my $child = open STDIN, "-|";
1566     defined $child or die "no fork: $!";
1567     if ($child == 0) {
1568       print $stdin or die $!;
1569       close STDOUT or die $!;
1570       exit 0;
1571     }
1572   }
1573
1574   return system (@$args) if $opts->{fork};
1575
1576   exec @$args;
1577   warn "ENV size is ".length(join(" ",%ENV));
1578   die "exec failed: $!: @$args";
1579 }
1580
1581
1582 sub ban_node_by_slot {
1583   # Don't start any new jobsteps on this node for 60 seconds
1584   my $slotid = shift;
1585   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1586   $slot[$slotid]->{node}->{hold_count}++;
1587   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1588 }
1589
1590 sub must_lock_now
1591 {
1592   my ($lockfile, $error_message) = @_;
1593   open L, ">", $lockfile or croak("$lockfile: $!");
1594   if (!flock L, LOCK_EX|LOCK_NB) {
1595     croak("Can't lock $lockfile: $error_message\n");
1596   }
1597 }
1598
1599 sub find_docker_image {
1600   # Given a Keep locator, check to see if it contains a Docker image.
1601   # If so, return its stream name and Docker hash.
1602   # If not, return undef for both values.
1603   my $locator = shift;
1604   my ($streamname, $filename);
1605   my $image = api_call("collections/get", uuid => $locator);
1606   if ($image) {
1607     foreach my $line (split(/\n/, $image->{manifest_text})) {
1608       my @tokens = split(/\s+/, $line);
1609       next if (!@tokens);
1610       $streamname = shift(@tokens);
1611       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1612         if (defined($filename)) {
1613           return (undef, undef);  # More than one file in the Collection.
1614         } else {
1615           $filename = (split(/:/, $filedata, 3))[2];
1616         }
1617       }
1618     }
1619   }
1620   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1621     return ($streamname, $1);
1622   } else {
1623     return (undef, undef);
1624   }
1625 }
1626
1627 sub retry_count {
1628   # Calculate the number of times an operation should be retried,
1629   # assuming exponential backoff, and that we're willing to retry as
1630   # long as tasks have been running.  Enforce a minimum of 3 retries.
1631   my ($starttime, $endtime, $timediff, $retries);
1632   if (@jobstep) {
1633     $starttime = $jobstep[0]->{starttime};
1634     $endtime = $jobstep[-1]->{finishtime};
1635   }
1636   if (!defined($starttime)) {
1637     $timediff = 0;
1638   } elsif (!defined($endtime)) {
1639     $timediff = time - $starttime;
1640   } else {
1641     $timediff = ($endtime - $starttime) - (time - $endtime);
1642   }
1643   if ($timediff > 0) {
1644     $retries = int(log($timediff) / log(2));
1645   } else {
1646     $retries = 1;  # Use the minimum.
1647   }
1648   return ($retries > 3) ? $retries : 3;
1649 }
1650
1651 sub retry_op {
1652   # Pass in two function references.
1653   # This method will be called with the remaining arguments.
1654   # If it dies, retry it with exponential backoff until it succeeds,
1655   # or until the current retry_count is exhausted.  After each failure
1656   # that can be retried, the second function will be called with
1657   # the current try count (0-based), next try time, and error message.
1658   my $operation = shift;
1659   my $retry_callback = shift;
1660   my $retries = retry_count();
1661   foreach my $try_count (0..$retries) {
1662     my $next_try = time + (2 ** $try_count);
1663     my $result = eval { $operation->(@_); };
1664     if (!$@) {
1665       return $result;
1666     } elsif ($try_count < $retries) {
1667       $retry_callback->($try_count, $next_try, $@);
1668       my $sleep_time = $next_try - time;
1669       sleep($sleep_time) if ($sleep_time > 0);
1670     }
1671   }
1672   # Ensure the error message ends in a newline, so Perl doesn't add
1673   # retry_op's line number to it.
1674   chomp($@);
1675   die($@ . "\n");
1676 }
1677
1678 sub api_call {
1679   # Pass in a /-separated API method name, and arguments for it.
1680   # This function will call that method, retrying as needed until
1681   # the current retry_count is exhausted, with a log on the first failure.
1682   my $method_name = shift;
1683   my $log_api_retry = sub {
1684     my ($try_count, $next_try_at, $errmsg) = @_;
1685     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1686     $errmsg =~ s/\s/ /g;
1687     $errmsg =~ s/\s+$//;
1688     my $retry_msg;
1689     if ($next_try_at < time) {
1690       $retry_msg = "Retrying.";
1691     } else {
1692       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1693       $retry_msg = "Retrying at $next_try_fmt.";
1694     }
1695     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1696   };
1697   my $method = $arv;
1698   foreach my $key (split(/\//, $method_name)) {
1699     $method = $method->{$key};
1700   }
1701   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1702 }
1703
1704 sub exit_status_s {
1705   # Given a $?, return a human-readable exit code string like "0" or
1706   # "1" or "0 with signal 1" or "1 with signal 11".
1707   my $exitcode = shift;
1708   my $s = $exitcode >> 8;
1709   if ($exitcode & 0x7f) {
1710     $s .= " with signal " . ($exitcode & 0x7f);
1711   }
1712   if ($exitcode & 0x80) {
1713     $s .= " with core dump";
1714   }
1715   return $s;
1716 }
1717
1718 __DATA__
1719 #!/usr/bin/perl
1720
1721 # checkout-and-build
1722
1723 use Fcntl ':flock';
1724 use File::Path qw( make_path remove_tree );
1725
1726 my $destdir = $ENV{"CRUNCH_SRC"};
1727 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1728 my $repo = $ENV{"CRUNCH_SRC_URL"};
1729 my $job_work = $ENV{"JOB_WORK"};
1730 my $task_work = $ENV{"TASK_WORK"};
1731
1732 for my $dir ($destdir, $job_work, $task_work) {
1733   if ($dir) {
1734     make_path $dir;
1735     -e $dir or die "Failed to create temporary directory ($dir): $!";
1736   }
1737 }
1738
1739 if ($task_work) {
1740   remove_tree($task_work, {keep_root => 1});
1741 }
1742
1743 my @git_archive_data = <DATA>;
1744 if (!@git_archive_data) {
1745   # Nothing to extract -> nothing to install.
1746   run_argv_and_exit();
1747 }
1748
1749 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1750 flock L, LOCK_EX;
1751 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1752   # This version already installed -> nothing to do.
1753   run_argv_and_exit();
1754 }
1755
1756 unlink "$destdir.commit";
1757 open STDERR_ORIG, ">&STDERR";
1758 open STDOUT, ">", "$destdir.log";
1759 open STDERR, ">&STDOUT";
1760
1761 mkdir $destdir;
1762 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1763 print TARX @git_archive_data;
1764 if(!close(TARX)) {
1765   die "'tar -C $destdir -xf -' exited $?: $!";
1766 }
1767
1768 my $pwd;
1769 chomp ($pwd = `pwd`);
1770 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1771 mkdir $install_dir;
1772
1773 for my $src_path ("$destdir/arvados/sdk/python") {
1774   if (-d $src_path) {
1775     shell_or_die ("virtualenv", $install_dir);
1776     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1777   }
1778 }
1779
1780 if (-e "$destdir/crunch_scripts/install") {
1781     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1782 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1783     # Old version
1784     shell_or_die ("./tests/autotests.sh", $install_dir);
1785 } elsif (-e "./install.sh") {
1786     shell_or_die ("./install.sh", $install_dir);
1787 }
1788
1789 if ($commit) {
1790     unlink "$destdir.commit.new";
1791     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1792     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1793 }
1794
1795 close L;
1796
1797 run_argv_and_exit();
1798
1799 sub run_argv_and_exit
1800 {
1801   if (@ARGV) {
1802     exec(@ARGV);
1803     die "Cannot exec `@ARGV`: $!";
1804   } else {
1805     exit 0;
1806   }
1807 }
1808
1809 sub shell_or_die
1810 {
1811   if ($ENV{"DEBUG"}) {
1812     print STDERR "@_\n";
1813   }
1814   if (system (@_) != 0) {
1815     my $err = $!;
1816     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1817     open STDERR, ">&STDERR_ORIG";
1818     system ("cat $destdir.log >&2");
1819     die "@_ failed ($err): $exitstatus";
1820   }
1821 }
1822
1823 __DATA__