fff5e7bbe307fbc01f27177ac6a5141630ce69e3
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Digest::MD5 qw(md5_hex);
90 use Getopt::Long;
91 use IPC::Open2;
92 use IO::Select;
93 use File::Temp;
94 use Fcntl ':flock';
95 use File::Path qw( make_path remove_tree );
96
97 use constant EX_TEMPFAIL => 75;
98
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102   if ($ENV{"USER"} ne "crunch" && $< != 0) {
103     # use a tmp dir unique for my uid
104     $ENV{"CRUNCH_TMP"} .= "-$<";
105   }
106 }
107
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
111 }
112
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
117
118 my $force_unlock;
119 my $git_dir;
120 my $jobspec;
121 my $job_api_token;
122 my $no_clear_tmp;
123 my $resume_stash;
124 GetOptions('force-unlock' => \$force_unlock,
125            'git-dir=s' => \$git_dir,
126            'job=s' => \$jobspec,
127            'job-api-token=s' => \$job_api_token,
128            'no-clear-tmp' => \$no_clear_tmp,
129            'resume-stash=s' => \$resume_stash,
130     );
131
132 if (defined $job_api_token) {
133   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
134 }
135
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
137 my $local_job = 0;
138
139
140 $SIG{'USR1'} = sub
141 {
142   $main::ENV{CRUNCH_DEBUG} = 1;
143 };
144 $SIG{'USR2'} = sub
145 {
146   $main::ENV{CRUNCH_DEBUG} = 0;
147 };
148
149
150
151 my $arv = Arvados->new('apiVersion' => 'v1');
152
153 my $Job;
154 my $job_id;
155 my $dbh;
156 my $sth;
157 my @jobstep;
158
159 my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
160
161 if ($jobspec =~ /^[-a-z\d]+$/)
162 {
163   # $jobspec is an Arvados UUID, not a JSON job specification
164   $Job = retry_op(sub {
165     $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
166   });
167   if (!$force_unlock) {
168     # Claim this job, and make sure nobody else does
169     eval { retry_op(sub {
170       # lock() sets is_locked_by_uuid and changes state to Running.
171       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
172     }); };
173     if ($@) {
174       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
175       exit EX_TEMPFAIL;
176     };
177   }
178 }
179 else
180 {
181   $Job = JSON::decode_json($jobspec);
182
183   if (!$resume_stash)
184   {
185     map { croak ("No $_ specified") unless $Job->{$_} }
186     qw(script script_version script_parameters);
187   }
188
189   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
190   $Job->{'started_at'} = gmtime;
191   $Job->{'state'} = 'Running';
192
193   $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
194 }
195 $job_id = $Job->{'uuid'};
196
197 my $keep_logfile = $job_id . '.log.txt';
198 log_writer_start($keep_logfile);
199
200 $Job->{'runtime_constraints'} ||= {};
201 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
202 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
203
204
205 Log (undef, "check slurm allocation");
206 my @slot;
207 my @node;
208 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
209 my @sinfo;
210 if (!$have_slurm)
211 {
212   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
213   push @sinfo, "$localcpus localhost";
214 }
215 if (exists $ENV{SLURM_NODELIST})
216 {
217   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
218 }
219 foreach (@sinfo)
220 {
221   my ($ncpus, $slurm_nodelist) = split;
222   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
223
224   my @nodelist;
225   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
226   {
227     my $nodelist = $1;
228     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
229     {
230       my $ranges = $1;
231       foreach (split (",", $ranges))
232       {
233         my ($a, $b);
234         if (/(\d+)-(\d+)/)
235         {
236           $a = $1;
237           $b = $2;
238         }
239         else
240         {
241           $a = $_;
242           $b = $_;
243         }
244         push @nodelist, map {
245           my $n = $nodelist;
246           $n =~ s/\[[-,\d]+\]/$_/;
247           $n;
248         } ($a..$b);
249       }
250     }
251     else
252     {
253       push @nodelist, $nodelist;
254     }
255   }
256   foreach my $nodename (@nodelist)
257   {
258     Log (undef, "node $nodename - $ncpus slots");
259     my $node = { name => $nodename,
260                  ncpus => $ncpus,
261                  losing_streak => 0,
262                  hold_until => 0 };
263     foreach my $cpu (1..$ncpus)
264     {
265       push @slot, { node => $node,
266                     cpu => $cpu };
267     }
268   }
269   push @node, @nodelist;
270 }
271
272
273
274 # Ensure that we get one jobstep running on each allocated node before
275 # we start overloading nodes with concurrent steps
276
277 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
278
279
280 $Job->update_attributes(
281   'tasks_summary' => { 'failed' => 0,
282                        'todo' => 1,
283                        'running' => 0,
284                        'done' => 0 });
285
286 Log (undef, "start");
287 $SIG{'INT'} = sub { $main::please_freeze = 1; };
288 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
289 $SIG{'TERM'} = \&croak;
290 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
291 $SIG{'ALRM'} = sub { $main::please_info = 1; };
292 $SIG{'CONT'} = sub { $main::please_continue = 1; };
293 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
294
295 $main::please_freeze = 0;
296 $main::please_info = 0;
297 $main::please_continue = 0;
298 $main::please_refresh = 0;
299 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
300
301 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
302 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
303 $ENV{"JOB_UUID"} = $job_id;
304
305
306 my @jobstep_todo = ();
307 my @jobstep_done = ();
308 my @jobstep_tomerge = ();
309 my $jobstep_tomerge_level = 0;
310 my $squeue_checked;
311 my $squeue_kill_checked;
312 my $output_in_keep = 0;
313 my $latest_refresh = scalar time;
314
315
316
317 if (defined $Job->{thawedfromkey})
318 {
319   thaw ($Job->{thawedfromkey});
320 }
321 else
322 {
323   my $first_task = retry_op(sub {
324     $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
325       'job_uuid' => $Job->{'uuid'},
326       'sequence' => 0,
327       'qsequence' => 0,
328       'parameters' => {},
329     });
330   });
331   push @jobstep, { 'level' => 0,
332                    'failures' => 0,
333                    'arvados_task' => $first_task,
334                  };
335   push @jobstep_todo, 0;
336 }
337
338
339 if (!$have_slurm)
340 {
341   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
342 }
343
344
345 my $build_script;
346 do {
347   local $/ = undef;
348   $build_script = <DATA>;
349 };
350 my $nodelist = join(",", @node);
351
352 if (!defined $no_clear_tmp) {
353   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
354   Log (undef, "Clean work dirs");
355
356   my $cleanpid = fork();
357   if ($cleanpid == 0)
358   {
359     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
360           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
361     exit (1);
362   }
363   while (1)
364   {
365     last if $cleanpid == waitpid (-1, WNOHANG);
366     freeze_if_want_freeze ($cleanpid);
367     select (undef, undef, undef, 0.1);
368   }
369   Log (undef, "Cleanup command exited ".exit_status_s($?));
370 }
371
372
373 my $git_archive;
374 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
375   # If script_version looks like an absolute path, *and* the --git-dir
376   # argument was not given -- which implies we were not invoked by
377   # crunch-dispatch -- we will use the given path as a working
378   # directory instead of resolving script_version to a git commit (or
379   # doing anything else with git).
380   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
381   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
382 }
383 else {
384   # Resolve the given script_version to a git commit sha1. Also, if
385   # the repository is remote, clone it into our local filesystem: this
386   # ensures "git archive" will work, and is necessary to reliably
387   # resolve a symbolic script_version like "master^".
388   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
389
390   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
391
392   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
393
394   # If we're running under crunch-dispatch, it will have already
395   # pulled the appropriate source tree into its own repository, and
396   # given us that repo's path as $git_dir.
397   #
398   # If we're running a "local" job, we might have to fetch content
399   # from a remote repository.
400   #
401   # (Currently crunch-dispatch gives a local path with --git-dir, but
402   # we might as well accept URLs there too in case it changes its
403   # mind.)
404   my $repo = $git_dir || $Job->{'repository'};
405
406   # Repository can be remote or local. If remote, we'll need to fetch it
407   # to a local dir before doing `git log` et al.
408   my $repo_location;
409
410   if ($repo =~ m{://|^[^/]*:}) {
411     # $repo is a git url we can clone, like git:// or https:// or
412     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
413     # not recognized here because distinguishing that from a local
414     # path is too fragile. If you really need something strange here,
415     # use the ssh:// form.
416     $repo_location = 'remote';
417   } elsif ($repo =~ m{^\.*/}) {
418     # $repo is a local path to a git index. We'll also resolve ../foo
419     # to ../foo/.git if the latter is a directory. To help
420     # disambiguate local paths from named hosted repositories, this
421     # form must be given as ./ or ../ if it's a relative path.
422     if (-d "$repo/.git") {
423       $repo = "$repo/.git";
424     }
425     $repo_location = 'local';
426   } else {
427     # $repo is none of the above. It must be the name of a hosted
428     # repository.
429     my $arv_repo_list = retry_op(sub {
430       $arv->{'repositories'}->{'list'}->execute(
431         'filters' => [['name','=',$repo]]);
432     });
433     my @repos_found = @{$arv_repo_list->{'items'}};
434     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
435     if ($n_found > 0) {
436       Log(undef, "Repository '$repo' -> "
437           . join(", ", map { $_->{'uuid'} } @repos_found));
438     }
439     if ($n_found != 1) {
440       croak("Error: Found $n_found repositories with name '$repo'.");
441     }
442     $repo = $repos_found[0]->{'fetch_url'};
443     $repo_location = 'remote';
444   }
445   Log(undef, "Using $repo_location repository '$repo'");
446   $ENV{"CRUNCH_SRC_URL"} = $repo;
447
448   # Resolve given script_version (we'll call that $treeish here) to a
449   # commit sha1 ($commit).
450   my $treeish = $Job->{'script_version'};
451   my $commit;
452   if ($repo_location eq 'remote') {
453     # We minimize excess object-fetching by re-using the same bare
454     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
455     # just keep adding remotes to it as needed.
456     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
457     my $gitcmd = "git --git-dir=\Q$local_repo\E";
458
459     # Set up our local repo for caching remote objects, making
460     # archives, etc.
461     if (!-d $local_repo) {
462       make_path($local_repo) or croak("Error: could not create $local_repo");
463     }
464     # This works (exits 0 and doesn't delete fetched objects) even
465     # if $local_repo is already initialized:
466     `$gitcmd init --bare`;
467     if ($?) {
468       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
469     }
470
471     # If $treeish looks like a hash (or abbrev hash) we look it up in
472     # our local cache first, since that's cheaper. (We don't want to
473     # do that with tags/branches though -- those change over time, so
474     # they should always be resolved by the remote repo.)
475     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
476       # Hide stderr because it's normal for this to fail:
477       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
478       if ($? == 0 &&
479           # Careful not to resolve a branch named abcdeff to commit 1234567:
480           $sha1 =~ /^$treeish/ &&
481           $sha1 =~ /^([0-9a-f]{40})$/s) {
482         $commit = $1;
483         Log(undef, "Commit $commit already present in $local_repo");
484       }
485     }
486
487     if (!defined $commit) {
488       # If $treeish isn't just a hash or abbrev hash, or isn't here
489       # yet, we need to fetch the remote to resolve it correctly.
490
491       # First, remove all local heads. This prevents a name that does
492       # not exist on the remote from resolving to (or colliding with)
493       # a previously fetched branch or tag (possibly from a different
494       # remote).
495       remove_tree("$local_repo/refs/heads", {keep_root => 1});
496
497       Log(undef, "Fetching objects from $repo to $local_repo");
498       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
499       if ($?) {
500         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
501       }
502     }
503
504     # Now that the data is all here, we will use our local repo for
505     # the rest of our git activities.
506     $repo = $local_repo;
507   }
508
509   my $gitcmd = "git --git-dir=\Q$repo\E";
510   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
511   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
512     croak("`$gitcmd rev-list` exited "
513           .exit_status_s($?)
514           .", '$treeish' not found. Giving up.");
515   }
516   $commit = $1;
517   Log(undef, "Version $treeish is commit $commit");
518
519   if ($commit ne $Job->{'script_version'}) {
520     # Record the real commit id in the database, frozentokey, logs,
521     # etc. -- instead of an abbreviation or a branch name which can
522     # become ambiguous or point to a different commit in the future.
523     if (!$Job->update_attributes('script_version' => $commit)) {
524       croak("Error: failed to update job's script_version attribute");
525     }
526   }
527
528   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
529   $git_archive = `$gitcmd archive ''\Q$commit\E`;
530   if ($?) {
531     croak("Error: $gitcmd archive exited ".exit_status_s($?));
532   }
533 }
534
535 if (!defined $git_archive) {
536   Log(undef, "Skip install phase (no git archive)");
537   if ($have_slurm) {
538     Log(undef, "Warning: This probably means workers have no source tree!");
539   }
540 }
541 else {
542   Log(undef, "Run install script on all workers");
543
544   my @srunargs = ("srun",
545                   "--nodelist=$nodelist",
546                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
547   my @execargs = ("sh", "-c",
548                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
549
550   my $installpid = fork();
551   if ($installpid == 0)
552   {
553     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
554     exit (1);
555   }
556   while (1)
557   {
558     last if $installpid == waitpid (-1, WNOHANG);
559     freeze_if_want_freeze ($installpid);
560     select (undef, undef, undef, 0.1);
561   }
562   Log (undef, "Install script exited ".exit_status_s($?));
563 }
564
565 if (!$have_slurm)
566 {
567   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
568   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
569 }
570
571 # If this job requires a Docker image, install that.
572 my $docker_bin = "/usr/bin/docker.io";
573 my ($docker_locator, $docker_stream, $docker_hash);
574 if ($docker_locator = $Job->{docker_image_locator}) {
575   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
576   if (!$docker_hash)
577   {
578     croak("No Docker image hash found from locator $docker_locator");
579   }
580   $docker_stream =~ s/^\.//;
581   my $docker_install_script = qq{
582 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
583     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
584 fi
585 };
586   my $docker_pid = fork();
587   if ($docker_pid == 0)
588   {
589     srun (["srun", "--nodelist=" . join(',', @node)],
590           ["/bin/sh", "-ec", $docker_install_script]);
591     exit ($?);
592   }
593   while (1)
594   {
595     last if $docker_pid == waitpid (-1, WNOHANG);
596     freeze_if_want_freeze ($docker_pid);
597     select (undef, undef, undef, 0.1);
598   }
599   if ($? != 0)
600   {
601     croak("Installing Docker image from $docker_locator exited "
602           .exit_status_s($?));
603   }
604 }
605
606 foreach (qw (script script_version script_parameters runtime_constraints))
607 {
608   Log (undef,
609        "$_ " .
610        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
611 }
612 foreach (split (/\n/, $Job->{knobs}))
613 {
614   Log (undef, "knob " . $_);
615 }
616
617
618
619 $main::success = undef;
620
621
622
623 ONELEVEL:
624
625 my $thisround_succeeded = 0;
626 my $thisround_failed = 0;
627 my $thisround_failed_multiple = 0;
628
629 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
630                        or $a <=> $b } @jobstep_todo;
631 my $level = $jobstep[$jobstep_todo[0]]->{level};
632 Log (undef, "start level $level");
633
634
635
636 my %proc;
637 my @freeslot = (0..$#slot);
638 my @holdslot;
639 my %reader;
640 my $progress_is_dirty = 1;
641 my $progress_stats_updated = 0;
642
643 update_progress_stats();
644
645
646
647 THISROUND:
648 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
649 {
650   my $id = $jobstep_todo[$todo_ptr];
651   my $Jobstep = $jobstep[$id];
652   if ($Jobstep->{level} != $level)
653   {
654     next;
655   }
656
657   pipe $reader{$id}, "writer" or croak ($!);
658   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
659   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
660
661   my $childslot = $freeslot[0];
662   my $childnode = $slot[$childslot]->{node};
663   my $childslotname = join (".",
664                             $slot[$childslot]->{node}->{name},
665                             $slot[$childslot]->{cpu});
666   my $childpid = fork();
667   if ($childpid == 0)
668   {
669     $SIG{'INT'} = 'DEFAULT';
670     $SIG{'QUIT'} = 'DEFAULT';
671     $SIG{'TERM'} = 'DEFAULT';
672
673     foreach (values (%reader))
674     {
675       close($_);
676     }
677     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
678     open(STDOUT,">&writer");
679     open(STDERR,">&writer");
680
681     undef $dbh;
682     undef $sth;
683
684     delete $ENV{"GNUPGHOME"};
685     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
686     $ENV{"TASK_QSEQUENCE"} = $id;
687     $ENV{"TASK_SEQUENCE"} = $level;
688     $ENV{"JOB_SCRIPT"} = $Job->{script};
689     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
690       $param =~ tr/a-z/A-Z/;
691       $ENV{"JOB_PARAMETER_$param"} = $value;
692     }
693     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
694     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
695     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
696     $ENV{"HOME"} = $ENV{"TASK_WORK"};
697     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
698     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
699     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
700     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
701
702     $ENV{"GZIP"} = "-n";
703
704     my @srunargs = (
705       "srun",
706       "--nodelist=".$childnode->{name},
707       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
708       "--job-name=$job_id.$id.$$",
709         );
710     my $build_script_to_send = "";
711     my $command =
712         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
713         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
714         ."&& cd $ENV{CRUNCH_TMP} ";
715     if ($build_script)
716     {
717       $build_script_to_send = $build_script;
718       $command .=
719           "&& perl -";
720     }
721     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
722     if ($docker_hash)
723     {
724       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
725       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
726       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
727
728       # Dynamically configure the container to use the host system as its
729       # DNS server.  Get the host's global addresses from the ip command,
730       # and turn them into docker --dns options using gawk.
731       $command .=
732           q{$(ip -o address show scope global |
733               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
734
735       # The source tree and $destdir directory (which we have
736       # installed on the worker host) are available in the container,
737       # under the same path.
738       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
739       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
740
741       # For some reason we make arv-mount's mount point appear at
742       # /keep inside the container, instead of using the same path as
743       # the host and expecting the task to pay attention to
744       # $TASK_KEEPMOUNT like we do with everything else.
745       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
746       $ENV{TASK_KEEPMOUNT} = "/keep";
747
748       # TASK_WORK is a plain docker data volume: it starts out empty,
749       # is writable, and persists until no containers use it any
750       # more. We don't use --volumes-from to share it with other
751       # containers: it is only accessible to this task, and it goes
752       # away when this task stops.
753       $command .= "--volume=\Q$ENV{TASK_WORK}\E ";
754
755       # JOB_WORK is also a plain docker data volume for now. TODO:
756       # Share a single JOB_WORK volume across all task containers on a
757       # given worker node, and delete it when the job ends (and, in
758       # case that doesn't work, when the next job starts).
759       $command .= "--volume=\Q$ENV{JOB_WORK}\E ";
760
761       while (my ($env_key, $env_val) = each %ENV)
762       {
763         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
764           $command .= "--env=\Q$env_key=$env_val\E ";
765         }
766       }
767       $command .= "--env=\QHOME=$ENV{HOME}\E ";
768       $command .= "\Q$docker_hash\E ";
769       $command .= "stdbuf --output=0 --error=0 ";
770       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
771     } else {
772       # Non-docker run
773       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
774       $command .= "stdbuf --output=0 --error=0 ";
775       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
776     }
777
778     my @execargs = ('bash', '-c', $command);
779     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
780     # exec() failed, we assume nothing happened.
781     die "srun() failed on build script\n";
782   }
783   close("writer");
784   if (!defined $childpid)
785   {
786     close $reader{$id};
787     delete $reader{$id};
788     next;
789   }
790   shift @freeslot;
791   $proc{$childpid} = { jobstep => $id,
792                        time => time,
793                        slot => $childslot,
794                        jobstepname => "$job_id.$id.$childpid",
795                      };
796   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
797   $slot[$childslot]->{pid} = $childpid;
798
799   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
800   Log ($id, "child $childpid started on $childslotname");
801   $Jobstep->{starttime} = time;
802   $Jobstep->{node} = $childnode->{name};
803   $Jobstep->{slotindex} = $childslot;
804   delete $Jobstep->{stderr};
805   delete $Jobstep->{finishtime};
806
807   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
808   $Jobstep->{'arvados_task'}->save;
809
810   splice @jobstep_todo, $todo_ptr, 1;
811   --$todo_ptr;
812
813   $progress_is_dirty = 1;
814
815   while (!@freeslot
816          ||
817          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
818   {
819     last THISROUND if $main::please_freeze;
820     if ($main::please_info)
821     {
822       $main::please_info = 0;
823       freeze();
824       collate_output();
825       save_meta(1);
826       update_progress_stats();
827     }
828     my $gotsome
829         = readfrompipes ()
830         + reapchildren ();
831     if (!$gotsome)
832     {
833       check_refresh_wanted();
834       check_squeue();
835       update_progress_stats();
836       select (undef, undef, undef, 0.1);
837     }
838     elsif (time - $progress_stats_updated >= 30)
839     {
840       update_progress_stats();
841     }
842     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
843         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
844     {
845       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
846           .($thisround_failed+$thisround_succeeded)
847           .") -- giving up on this round";
848       Log (undef, $message);
849       last THISROUND;
850     }
851
852     # move slots from freeslot to holdslot (or back to freeslot) if necessary
853     for (my $i=$#freeslot; $i>=0; $i--) {
854       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
855         push @holdslot, (splice @freeslot, $i, 1);
856       }
857     }
858     for (my $i=$#holdslot; $i>=0; $i--) {
859       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
860         push @freeslot, (splice @holdslot, $i, 1);
861       }
862     }
863
864     # give up if no nodes are succeeding
865     if (!grep { $_->{node}->{losing_streak} == 0 &&
866                     $_->{node}->{hold_count} < 4 } @slot) {
867       my $message = "Every node has failed -- giving up on this round";
868       Log (undef, $message);
869       last THISROUND;
870     }
871   }
872 }
873
874
875 push @freeslot, splice @holdslot;
876 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
877
878
879 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
880 while (%proc)
881 {
882   if ($main::please_continue) {
883     $main::please_continue = 0;
884     goto THISROUND;
885   }
886   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
887   readfrompipes ();
888   if (!reapchildren())
889   {
890     check_refresh_wanted();
891     check_squeue();
892     update_progress_stats();
893     select (undef, undef, undef, 0.1);
894     killem (keys %proc) if $main::please_freeze;
895   }
896 }
897
898 update_progress_stats();
899 freeze_if_want_freeze();
900
901
902 if (!defined $main::success)
903 {
904   if (@jobstep_todo &&
905       $thisround_succeeded == 0 &&
906       ($thisround_failed == 0 || $thisround_failed > 4))
907   {
908     my $message = "stop because $thisround_failed tasks failed and none succeeded";
909     Log (undef, $message);
910     $main::success = 0;
911   }
912   if (!@jobstep_todo)
913   {
914     $main::success = 1;
915   }
916 }
917
918 goto ONELEVEL if !defined $main::success;
919
920
921 release_allocation();
922 freeze();
923 my $collated_output = &collate_output();
924
925 if (!$collated_output) {
926   Log(undef, "output undef");
927 }
928 else {
929   eval {
930     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
931         or die "failed to get collated manifest: $!";
932     my $orig_manifest_text = '';
933     while (my $manifest_line = <$orig_manifest>) {
934       $orig_manifest_text .= $manifest_line;
935     }
936     my $output = retry_op(sub {
937       $arv->{'collections'}->{'create'}->execute(
938         'collection' => {'manifest_text' => $orig_manifest_text});
939     });
940     Log(undef, "output uuid " . $output->{uuid});
941     Log(undef, "output hash " . $output->{portable_data_hash});
942     $Job->update_attributes('output' => $output->{portable_data_hash});
943   };
944   if ($@) {
945     Log (undef, "Failed to register output manifest: $@");
946   }
947 }
948
949 Log (undef, "finish");
950
951 save_meta();
952
953 my $final_state;
954 if ($collated_output && $main::success) {
955   $final_state = 'Complete';
956 } else {
957   $final_state = 'Failed';
958 }
959 $Job->update_attributes('state' => $final_state);
960
961 exit (($final_state eq 'Complete') ? 0 : 1);
962
963
964
965 sub update_progress_stats
966 {
967   $progress_stats_updated = time;
968   return if !$progress_is_dirty;
969   my ($todo, $done, $running) = (scalar @jobstep_todo,
970                                  scalar @jobstep_done,
971                                  scalar @slot - scalar @freeslot - scalar @holdslot);
972   $Job->{'tasks_summary'} ||= {};
973   $Job->{'tasks_summary'}->{'todo'} = $todo;
974   $Job->{'tasks_summary'}->{'done'} = $done;
975   $Job->{'tasks_summary'}->{'running'} = $running;
976   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
977   Log (undef, "status: $done done, $running running, $todo todo");
978   $progress_is_dirty = 0;
979 }
980
981
982
983 sub reapchildren
984 {
985   my $pid = waitpid (-1, WNOHANG);
986   return 0 if $pid <= 0;
987
988   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
989                   . "."
990                   . $slot[$proc{$pid}->{slot}]->{cpu});
991   my $jobstepid = $proc{$pid}->{jobstep};
992   my $elapsed = time - $proc{$pid}->{time};
993   my $Jobstep = $jobstep[$jobstepid];
994
995   my $childstatus = $?;
996   my $exitvalue = $childstatus >> 8;
997   my $exitinfo = "exit ".exit_status_s($childstatus);
998   $Jobstep->{'arvados_task'}->reload;
999   my $task_success = $Jobstep->{'arvados_task'}->{success};
1000
1001   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1002
1003   if (!defined $task_success) {
1004     # task did not indicate one way or the other --> fail
1005     $Jobstep->{'arvados_task'}->{success} = 0;
1006     $Jobstep->{'arvados_task'}->save;
1007     $task_success = 0;
1008   }
1009
1010   if (!$task_success)
1011   {
1012     my $temporary_fail;
1013     $temporary_fail ||= $Jobstep->{node_fail};
1014     $temporary_fail ||= ($exitvalue == 111);
1015
1016     ++$thisround_failed;
1017     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1018
1019     # Check for signs of a failed or misconfigured node
1020     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1021         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1022       # Don't count this against jobstep failure thresholds if this
1023       # node is already suspected faulty and srun exited quickly
1024       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1025           $elapsed < 5) {
1026         Log ($jobstepid, "blaming failure on suspect node " .
1027              $slot[$proc{$pid}->{slot}]->{node}->{name});
1028         $temporary_fail ||= 1;
1029       }
1030       ban_node_by_slot($proc{$pid}->{slot});
1031     }
1032
1033     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1034                              ++$Jobstep->{'failures'},
1035                              $temporary_fail ? 'temporary ' : 'permanent',
1036                              $elapsed));
1037
1038     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1039       # Give up on this task, and the whole job
1040       $main::success = 0;
1041       $main::please_freeze = 1;
1042     }
1043     # Put this task back on the todo queue
1044     push @jobstep_todo, $jobstepid;
1045     $Job->{'tasks_summary'}->{'failed'}++;
1046   }
1047   else
1048   {
1049     ++$thisround_succeeded;
1050     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1051     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1052     push @jobstep_done, $jobstepid;
1053     Log ($jobstepid, "success in $elapsed seconds");
1054   }
1055   $Jobstep->{exitcode} = $childstatus;
1056   $Jobstep->{finishtime} = time;
1057   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1058   $Jobstep->{'arvados_task'}->save;
1059   process_stderr ($jobstepid, $task_success);
1060   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1061
1062   close $reader{$jobstepid};
1063   delete $reader{$jobstepid};
1064   delete $slot[$proc{$pid}->{slot}]->{pid};
1065   push @freeslot, $proc{$pid}->{slot};
1066   delete $proc{$pid};
1067
1068   if ($task_success) {
1069     # Load new tasks
1070     my $newtask_list = [];
1071     my $newtask_results;
1072     do {
1073       $newtask_results = retry_op(sub {
1074         $arv->{'job_tasks'}->{'list'}->execute(
1075           'where' => {
1076             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1077           },
1078           'order' => 'qsequence',
1079           'offset' => scalar(@$newtask_list),
1080         );
1081       });
1082       push(@$newtask_list, @{$newtask_results->{items}});
1083     } while (@{$newtask_results->{items}});
1084     foreach my $arvados_task (@$newtask_list) {
1085       my $jobstep = {
1086         'level' => $arvados_task->{'sequence'},
1087         'failures' => 0,
1088         'arvados_task' => $arvados_task
1089       };
1090       push @jobstep, $jobstep;
1091       push @jobstep_todo, $#jobstep;
1092     }
1093   }
1094
1095   $progress_is_dirty = 1;
1096   1;
1097 }
1098
1099 sub check_refresh_wanted
1100 {
1101   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1102   if (@stat && $stat[9] > $latest_refresh) {
1103     $latest_refresh = scalar time;
1104     my $Job2 = retry_op(sub {
1105       $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1106     });
1107     for my $attr ('cancelled_at',
1108                   'cancelled_by_user_uuid',
1109                   'cancelled_by_client_uuid',
1110                   'state') {
1111       $Job->{$attr} = $Job2->{$attr};
1112     }
1113     if ($Job->{'state'} ne "Running") {
1114       if ($Job->{'state'} eq "Cancelled") {
1115         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1116       } else {
1117         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1118       }
1119       $main::success = 0;
1120       $main::please_freeze = 1;
1121     }
1122   }
1123 }
1124
1125 sub check_squeue
1126 {
1127   # return if the kill list was checked <4 seconds ago
1128   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1129   {
1130     return;
1131   }
1132   $squeue_kill_checked = time;
1133
1134   # use killem() on procs whose killtime is reached
1135   for (keys %proc)
1136   {
1137     if (exists $proc{$_}->{killtime}
1138         && $proc{$_}->{killtime} <= time)
1139     {
1140       killem ($_);
1141     }
1142   }
1143
1144   # return if the squeue was checked <60 seconds ago
1145   if (defined $squeue_checked && $squeue_checked > time - 60)
1146   {
1147     return;
1148   }
1149   $squeue_checked = time;
1150
1151   if (!$have_slurm)
1152   {
1153     # here is an opportunity to check for mysterious problems with local procs
1154     return;
1155   }
1156
1157   # get a list of steps still running
1158   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1159   chop @squeue;
1160   if ($squeue[-1] ne "ok")
1161   {
1162     return;
1163   }
1164   pop @squeue;
1165
1166   # which of my jobsteps are running, according to squeue?
1167   my %ok;
1168   foreach (@squeue)
1169   {
1170     if (/^(\d+)\.(\d+) (\S+)/)
1171     {
1172       if ($1 eq $ENV{SLURM_JOBID})
1173       {
1174         $ok{$3} = 1;
1175       }
1176     }
1177   }
1178
1179   # which of my active child procs (>60s old) were not mentioned by squeue?
1180   foreach (keys %proc)
1181   {
1182     if ($proc{$_}->{time} < time - 60
1183         && !exists $ok{$proc{$_}->{jobstepname}}
1184         && !exists $proc{$_}->{killtime})
1185     {
1186       # kill this proc if it hasn't exited in 30 seconds
1187       $proc{$_}->{killtime} = time + 30;
1188     }
1189   }
1190 }
1191
1192
1193 sub release_allocation
1194 {
1195   if ($have_slurm)
1196   {
1197     Log (undef, "release job allocation");
1198     system "scancel $ENV{SLURM_JOBID}";
1199   }
1200 }
1201
1202
1203 sub readfrompipes
1204 {
1205   my $gotsome = 0;
1206   foreach my $job (keys %reader)
1207   {
1208     my $buf;
1209     while (0 < sysread ($reader{$job}, $buf, 8192))
1210     {
1211       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1212       $jobstep[$job]->{stderr} .= $buf;
1213       preprocess_stderr ($job);
1214       if (length ($jobstep[$job]->{stderr}) > 16384)
1215       {
1216         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1217       }
1218       $gotsome = 1;
1219     }
1220   }
1221   return $gotsome;
1222 }
1223
1224
1225 sub preprocess_stderr
1226 {
1227   my $job = shift;
1228
1229   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1230     my $line = $1;
1231     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1232     Log ($job, "stderr $line");
1233     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1234       # whoa.
1235       $main::please_freeze = 1;
1236     }
1237     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1238       $jobstep[$job]->{node_fail} = 1;
1239       ban_node_by_slot($jobstep[$job]->{slotindex});
1240     }
1241   }
1242 }
1243
1244
1245 sub process_stderr
1246 {
1247   my $job = shift;
1248   my $task_success = shift;
1249   preprocess_stderr ($job);
1250
1251   map {
1252     Log ($job, "stderr $_");
1253   } split ("\n", $jobstep[$job]->{stderr});
1254 }
1255
1256 sub fetch_block
1257 {
1258   my $hash = shift;
1259   my ($keep, $child_out, $output_block);
1260
1261   my $cmd = "arv-get \Q$hash\E";
1262   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1263   $output_block = '';
1264   while (1) {
1265     my $buf;
1266     my $bytes = sysread($keep, $buf, 1024 * 1024);
1267     if (!defined $bytes) {
1268       die "reading from arv-get: $!";
1269     } elsif ($bytes == 0) {
1270       # sysread returns 0 at the end of the pipe.
1271       last;
1272     } else {
1273       # some bytes were read into buf.
1274       $output_block .= $buf;
1275     }
1276   }
1277   close $keep;
1278   return $output_block;
1279 }
1280
1281 sub collate_output
1282 {
1283   Log (undef, "collate");
1284
1285   my ($child_out, $child_in);
1286   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1287                   '--retries', retry_count());
1288   my $joboutput;
1289   for (@jobstep)
1290   {
1291     next if (!exists $_->{'arvados_task'}->{'output'} ||
1292              !$_->{'arvados_task'}->{'success'});
1293     my $output = $_->{'arvados_task'}->{output};
1294     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1295     {
1296       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1297       print $child_in $output;
1298     }
1299     elsif (@jobstep == 1)
1300     {
1301       $joboutput = $output;
1302       last;
1303     }
1304     elsif (defined (my $outblock = fetch_block ($output)))
1305     {
1306       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1307       print $child_in $outblock;
1308     }
1309     else
1310     {
1311       Log (undef, "XXX fetch_block($output) failed XXX");
1312       $main::success = 0;
1313     }
1314   }
1315   $child_in->close;
1316
1317   if (!defined $joboutput) {
1318     my $s = IO::Select->new($child_out);
1319     if ($s->can_read(120)) {
1320       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1321       chomp($joboutput);
1322       # TODO: Ensure exit status == 0.
1323     } else {
1324       Log (undef, "timed out reading from 'arv-put'");
1325     }
1326   }
1327   # TODO: kill $pid instead of waiting, now that we've decided to
1328   # ignore further output.
1329   waitpid($pid, 0);
1330
1331   return $joboutput;
1332 }
1333
1334
1335 sub killem
1336 {
1337   foreach (@_)
1338   {
1339     my $sig = 2;                # SIGINT first
1340     if (exists $proc{$_}->{"sent_$sig"} &&
1341         time - $proc{$_}->{"sent_$sig"} > 4)
1342     {
1343       $sig = 15;                # SIGTERM if SIGINT doesn't work
1344     }
1345     if (exists $proc{$_}->{"sent_$sig"} &&
1346         time - $proc{$_}->{"sent_$sig"} > 4)
1347     {
1348       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1349     }
1350     if (!exists $proc{$_}->{"sent_$sig"})
1351     {
1352       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1353       kill $sig, $_;
1354       select (undef, undef, undef, 0.1);
1355       if ($sig == 2)
1356       {
1357         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1358       }
1359       $proc{$_}->{"sent_$sig"} = time;
1360       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1361     }
1362   }
1363 }
1364
1365
1366 sub fhbits
1367 {
1368   my($bits);
1369   for (@_) {
1370     vec($bits,fileno($_),1) = 1;
1371   }
1372   $bits;
1373 }
1374
1375
1376 # Send log output to Keep via arv-put.
1377 #
1378 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1379 # $log_pipe_pid is the pid of the arv-put subprocess.
1380 #
1381 # The only functions that should access these variables directly are:
1382 #
1383 # log_writer_start($logfilename)
1384 #     Starts an arv-put pipe, reading data on stdin and writing it to
1385 #     a $logfilename file in an output collection.
1386 #
1387 # log_writer_send($txt)
1388 #     Writes $txt to the output log collection.
1389 #
1390 # log_writer_finish()
1391 #     Closes the arv-put pipe and returns the output that it produces.
1392 #
1393 # log_writer_is_active()
1394 #     Returns a true value if there is currently a live arv-put
1395 #     process, false otherwise.
1396 #
1397 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1398
1399 sub log_writer_start($)
1400 {
1401   my $logfilename = shift;
1402   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1403                         'arv-put', '--portable-data-hash',
1404                         '--retries', '3',
1405                         '--filename', $logfilename,
1406                         '-');
1407 }
1408
1409 sub log_writer_send($)
1410 {
1411   my $txt = shift;
1412   print $log_pipe_in $txt;
1413 }
1414
1415 sub log_writer_finish()
1416 {
1417   return unless $log_pipe_pid;
1418
1419   close($log_pipe_in);
1420   my $arv_put_output;
1421
1422   my $s = IO::Select->new($log_pipe_out);
1423   if ($s->can_read(120)) {
1424     sysread($log_pipe_out, $arv_put_output, 1024);
1425     chomp($arv_put_output);
1426   } else {
1427     Log (undef, "timed out reading from 'arv-put'");
1428   }
1429
1430   waitpid($log_pipe_pid, 0);
1431   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1432   if ($?) {
1433     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1434   }
1435
1436   return $arv_put_output;
1437 }
1438
1439 sub log_writer_is_active() {
1440   return $log_pipe_pid;
1441 }
1442
1443 sub Log                         # ($jobstep_id, $logmessage)
1444 {
1445   if ($_[1] =~ /\n/) {
1446     for my $line (split (/\n/, $_[1])) {
1447       Log ($_[0], $line);
1448     }
1449     return;
1450   }
1451   my $fh = select STDERR; $|=1; select $fh;
1452   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1453   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1454   $message .= "\n";
1455   my $datetime;
1456   if (log_writer_is_active() || -t STDERR) {
1457     my @gmtime = gmtime;
1458     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1459                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1460   }
1461   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1462
1463   if (log_writer_is_active()) {
1464     log_writer_send($datetime . " " . $message);
1465   }
1466 }
1467
1468
1469 sub croak
1470 {
1471   my ($package, $file, $line) = caller;
1472   my $message = "@_ at $file line $line\n";
1473   Log (undef, $message);
1474   freeze() if @jobstep_todo;
1475   collate_output() if @jobstep_todo;
1476   cleanup();
1477   save_meta();
1478   die;
1479 }
1480
1481
1482 sub cleanup
1483 {
1484   return unless $Job;
1485   if ($Job->{'state'} eq 'Cancelled') {
1486     $Job->update_attributes('finished_at' => scalar gmtime);
1487   } else {
1488     $Job->update_attributes('state' => 'Failed');
1489   }
1490 }
1491
1492
1493 sub save_meta
1494 {
1495   my $justcheckpoint = shift; # false if this will be the last meta saved
1496   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1497   return unless log_writer_is_active();
1498
1499   my $loglocator = log_writer_finish();
1500   Log (undef, "log manifest is $loglocator");
1501   $Job->{'log'} = $loglocator;
1502   $Job->update_attributes('log', $loglocator);
1503 }
1504
1505
1506 sub freeze_if_want_freeze
1507 {
1508   if ($main::please_freeze)
1509   {
1510     release_allocation();
1511     if (@_)
1512     {
1513       # kill some srun procs before freeze+stop
1514       map { $proc{$_} = {} } @_;
1515       while (%proc)
1516       {
1517         killem (keys %proc);
1518         select (undef, undef, undef, 0.1);
1519         my $died;
1520         while (($died = waitpid (-1, WNOHANG)) > 0)
1521         {
1522           delete $proc{$died};
1523         }
1524       }
1525     }
1526     freeze();
1527     collate_output();
1528     cleanup();
1529     save_meta();
1530     exit 1;
1531   }
1532 }
1533
1534
1535 sub freeze
1536 {
1537   Log (undef, "Freeze not implemented");
1538   return;
1539 }
1540
1541
1542 sub thaw
1543 {
1544   croak ("Thaw not implemented");
1545 }
1546
1547
1548 sub freezequote
1549 {
1550   my $s = shift;
1551   $s =~ s/\\/\\\\/g;
1552   $s =~ s/\n/\\n/g;
1553   return $s;
1554 }
1555
1556
1557 sub freezeunquote
1558 {
1559   my $s = shift;
1560   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1561   return $s;
1562 }
1563
1564
1565 sub srun
1566 {
1567   my $srunargs = shift;
1568   my $execargs = shift;
1569   my $opts = shift || {};
1570   my $stdin = shift;
1571   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1572
1573   my $show_cmd = "@{$args}";
1574   $show_cmd =~ s/(TOKEN\\*=)\S+/${1}[...]/g;
1575   $show_cmd =~ s/\n/ /g;
1576   warn "starting: $show_cmd\n";
1577
1578   if (defined $stdin) {
1579     my $child = open STDIN, "-|";
1580     defined $child or die "no fork: $!";
1581     if ($child == 0) {
1582       print $stdin or die $!;
1583       close STDOUT or die $!;
1584       exit 0;
1585     }
1586   }
1587
1588   return system (@$args) if $opts->{fork};
1589
1590   exec @$args;
1591   warn "ENV size is ".length(join(" ",%ENV));
1592   die "exec failed: $!: @$args";
1593 }
1594
1595
1596 sub ban_node_by_slot {
1597   # Don't start any new jobsteps on this node for 60 seconds
1598   my $slotid = shift;
1599   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1600   $slot[$slotid]->{node}->{hold_count}++;
1601   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1602 }
1603
1604 sub must_lock_now
1605 {
1606   my ($lockfile, $error_message) = @_;
1607   open L, ">", $lockfile or croak("$lockfile: $!");
1608   if (!flock L, LOCK_EX|LOCK_NB) {
1609     croak("Can't lock $lockfile: $error_message\n");
1610   }
1611 }
1612
1613 sub find_docker_image {
1614   # Given a Keep locator, check to see if it contains a Docker image.
1615   # If so, return its stream name and Docker hash.
1616   # If not, return undef for both values.
1617   my $locator = shift;
1618   my ($streamname, $filename);
1619   my $image = retry_op(sub {
1620     $arv->{collections}->{get}->execute(uuid => $locator);
1621   });
1622   if ($image) {
1623     foreach my $line (split(/\n/, $image->{manifest_text})) {
1624       my @tokens = split(/\s+/, $line);
1625       next if (!@tokens);
1626       $streamname = shift(@tokens);
1627       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1628         if (defined($filename)) {
1629           return (undef, undef);  # More than one file in the Collection.
1630         } else {
1631           $filename = (split(/:/, $filedata, 3))[2];
1632         }
1633       }
1634     }
1635   }
1636   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1637     return ($streamname, $1);
1638   } else {
1639     return (undef, undef);
1640   }
1641 }
1642
1643 sub retry_count {
1644   # Calculate the number of times an operation should be retried,
1645   # assuming exponential backoff, and that we're willing to retry as
1646   # long as tasks have been running.  Enforce a minimum of 3 retries.
1647   my ($starttime, $endtime, $timediff, $retries);
1648   if (@jobstep) {
1649     $starttime = $jobstep[0]->{starttime};
1650     $endtime = $jobstep[-1]->{finishtime};
1651   }
1652   if (!defined($starttime)) {
1653     $timediff = 0;
1654   } elsif (!defined($endtime)) {
1655     $timediff = time - $starttime;
1656   } else {
1657     $timediff = ($endtime - $starttime) - (time - $endtime);
1658   }
1659   if ($timediff > 0) {
1660     $retries = int(log($timediff) / log(2));
1661   } else {
1662     $retries = 1;  # Use the minimum.
1663   }
1664   return ($retries > 3) ? $retries : 3;
1665 }
1666
1667 sub retry_op {
1668   # Given a function reference, call it with the remaining arguments.  If
1669   # it dies, retry it with exponential backoff until it succeeds, or until
1670   # the current retry_count is exhausted.
1671   my $operation = shift;
1672   my $retries = retry_count();
1673   foreach my $try_count (0..$retries) {
1674     my $next_try = time + (2 ** $try_count);
1675     my $result = eval { $operation->(@_); };
1676     if (!$@) {
1677       return $result;
1678     } elsif ($try_count < $retries) {
1679       my $sleep_time = $next_try - time;
1680       sleep($sleep_time) if ($sleep_time > 0);
1681     }
1682   }
1683   # Ensure the error message ends in a newline, so Perl doesn't add
1684   # retry_op's line number to it.
1685   chomp($@);
1686   die($@ . "\n");
1687 }
1688
1689 sub exit_status_s {
1690   # Given a $?, return a human-readable exit code string like "0" or
1691   # "1" or "0 with signal 1" or "1 with signal 11".
1692   my $exitcode = shift;
1693   my $s = $exitcode >> 8;
1694   if ($exitcode & 0x7f) {
1695     $s .= " with signal " . ($exitcode & 0x7f);
1696   }
1697   if ($exitcode & 0x80) {
1698     $s .= " with core dump";
1699   }
1700   return $s;
1701 }
1702
1703 __DATA__
1704 #!/usr/bin/perl
1705
1706 # checkout-and-build
1707
1708 use Fcntl ':flock';
1709 use File::Path qw( make_path remove_tree );
1710
1711 my $destdir = $ENV{"CRUNCH_SRC"};
1712 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1713 my $repo = $ENV{"CRUNCH_SRC_URL"};
1714 my $task_work = $ENV{"TASK_WORK"};
1715
1716 for my $dir ($destdir, $task_work) {
1717   if ($dir) {
1718     make_path $dir;
1719     -e $dir or die "Failed to create temporary directory ($dir): $!";
1720   }
1721 }
1722
1723 if ($task_work) {
1724   remove_tree($task_work, {keep_root => 1});
1725 }
1726
1727
1728 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1729 flock L, LOCK_EX;
1730 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1731     if (@ARGV) {
1732         exec(@ARGV);
1733         die "Cannot exec `@ARGV`: $!";
1734     } else {
1735         exit 0;
1736     }
1737 }
1738
1739 unlink "$destdir.commit";
1740 open STDERR_ORIG, ">&STDERR";
1741 open STDOUT, ">", "$destdir.log";
1742 open STDERR, ">&STDOUT";
1743
1744 mkdir $destdir;
1745 my @git_archive_data = <DATA>;
1746 if (@git_archive_data) {
1747   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1748   print TARX @git_archive_data;
1749   if(!close(TARX)) {
1750     die "'tar -C $destdir -xf -' exited $?: $!";
1751   }
1752 }
1753
1754 my $pwd;
1755 chomp ($pwd = `pwd`);
1756 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1757 mkdir $install_dir;
1758
1759 for my $src_path ("$destdir/arvados/sdk/python") {
1760   if (-d $src_path) {
1761     shell_or_die ("virtualenv", $install_dir);
1762     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1763   }
1764 }
1765
1766 if (-e "$destdir/crunch_scripts/install") {
1767     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1768 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1769     # Old version
1770     shell_or_die ("./tests/autotests.sh", $install_dir);
1771 } elsif (-e "./install.sh") {
1772     shell_or_die ("./install.sh", $install_dir);
1773 }
1774
1775 if ($commit) {
1776     unlink "$destdir.commit.new";
1777     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1778     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1779 }
1780
1781 close L;
1782
1783 if (@ARGV) {
1784     exec(@ARGV);
1785     die "Cannot exec `@ARGV`: $!";
1786 } else {
1787     exit 0;
1788 }
1789
1790 sub shell_or_die
1791 {
1792   if ($ENV{"DEBUG"}) {
1793     print STDERR "@_\n";
1794   }
1795   if (system (@_) != 0) {
1796     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1797     open STDERR, ">&STDERR_ORIG";
1798     system ("cat $destdir.log >&2");
1799     die "@_ failed ($!): $exitstatus";
1800   }
1801 }
1802
1803 __DATA__