Merge branch 'master' into 3586-job-priority
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   # Note: this section is almost certainly unnecessary if we're
491   # running tasks in docker containers.
492   my $installpid = fork();
493   if ($installpid == 0)
494   {
495     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
496     exit (1);
497   }
498   while (1)
499   {
500     last if $installpid == waitpid (-1, WNOHANG);
501     freeze_if_want_freeze ($installpid);
502     select (undef, undef, undef, 0.1);
503   }
504   Log (undef, "Install exited $?");
505 }
506
507 if (!$have_slurm)
508 {
509   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
510   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
511 }
512
513 # If this job requires a Docker image, install that.
514 my $docker_bin = "/usr/bin/docker.io";
515 my ($docker_locator, $docker_stream, $docker_hash);
516 if ($docker_locator = $Job->{docker_image_locator}) {
517   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
518   if (!$docker_hash)
519   {
520     croak("No Docker image hash found from locator $docker_locator");
521   }
522   $docker_stream =~ s/^\.//;
523   my $docker_install_script = qq{
524 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
525     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
526 fi
527 };
528   my $docker_pid = fork();
529   if ($docker_pid == 0)
530   {
531     srun (["srun", "--nodelist=" . join(',', @node)],
532           ["/bin/sh", "-ec", $docker_install_script]);
533     exit ($?);
534   }
535   while (1)
536   {
537     last if $docker_pid == waitpid (-1, WNOHANG);
538     freeze_if_want_freeze ($docker_pid);
539     select (undef, undef, undef, 0.1);
540   }
541   if ($? != 0)
542   {
543     croak("Installing Docker image from $docker_locator returned exit code $?");
544   }
545 }
546
547 foreach (qw (script script_version script_parameters runtime_constraints))
548 {
549   Log (undef,
550        "$_ " .
551        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
552 }
553 foreach (split (/\n/, $Job->{knobs}))
554 {
555   Log (undef, "knob " . $_);
556 }
557
558
559
560 $main::success = undef;
561
562
563
564 ONELEVEL:
565
566 my $thisround_succeeded = 0;
567 my $thisround_failed = 0;
568 my $thisround_failed_multiple = 0;
569
570 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
571                        or $a <=> $b } @jobstep_todo;
572 my $level = $jobstep[$jobstep_todo[0]]->{level};
573 Log (undef, "start level $level");
574
575
576
577 my %proc;
578 my @freeslot = (0..$#slot);
579 my @holdslot;
580 my %reader;
581 my $progress_is_dirty = 1;
582 my $progress_stats_updated = 0;
583
584 update_progress_stats();
585
586
587
588 THISROUND:
589 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
590 {
591   my $id = $jobstep_todo[$todo_ptr];
592   my $Jobstep = $jobstep[$id];
593   if ($Jobstep->{level} != $level)
594   {
595     next;
596   }
597
598   pipe $reader{$id}, "writer" or croak ($!);
599   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
600   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
601
602   my $childslot = $freeslot[0];
603   my $childnode = $slot[$childslot]->{node};
604   my $childslotname = join (".",
605                             $slot[$childslot]->{node}->{name},
606                             $slot[$childslot]->{cpu});
607   my $childpid = fork();
608   if ($childpid == 0)
609   {
610     $SIG{'INT'} = 'DEFAULT';
611     $SIG{'QUIT'} = 'DEFAULT';
612     $SIG{'TERM'} = 'DEFAULT';
613
614     foreach (values (%reader))
615     {
616       close($_);
617     }
618     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
619     open(STDOUT,">&writer");
620     open(STDERR,">&writer");
621
622     undef $dbh;
623     undef $sth;
624
625     delete $ENV{"GNUPGHOME"};
626     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
627     $ENV{"TASK_QSEQUENCE"} = $id;
628     $ENV{"TASK_SEQUENCE"} = $level;
629     $ENV{"JOB_SCRIPT"} = $Job->{script};
630     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
631       $param =~ tr/a-z/A-Z/;
632       $ENV{"JOB_PARAMETER_$param"} = $value;
633     }
634     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
635     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
636     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
637     $ENV{"HOME"} = $ENV{"TASK_WORK"};
638     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
639     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
640     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
641     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
642
643     $ENV{"GZIP"} = "-n";
644
645     my @srunargs = (
646       "srun",
647       "--nodelist=".$childnode->{name},
648       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
649       "--job-name=$job_id.$id.$$",
650         );
651     my $build_script_to_send = "";
652     my $command =
653         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
654         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
655         ."&& cd $ENV{CRUNCH_TMP} ";
656     if ($build_script)
657     {
658       $build_script_to_send = $build_script;
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
672       $command .= "--env=\QHOME=/home/crunch\E ";
673       while (my ($env_key, $env_val) = each %ENV)
674       {
675         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
676           if ($env_key eq "TASK_KEEPMOUNT") {
677             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
678           }
679           else {
680             $command .= "--env=\Q$env_key=$env_val\E ";
681           }
682         }
683       }
684       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
685       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
686       $command .= "\Q$docker_hash\E ";
687       $command .= "stdbuf --output=0 --error=0 ";
688       $command .= "perl - " if ($build_script);
689       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
690     } else {
691       # Non-docker run
692       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "perl - " if ($build_script);
695       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
696     }
697
698     my @execargs = ('bash', '-c', $command);
699     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
700     # exec() failed, we assume nothing happened.
701     Log(undef, "srun() failed on build script");
702     die;
703   }
704   close("writer");
705   if (!defined $childpid)
706   {
707     close $reader{$id};
708     delete $reader{$id};
709     next;
710   }
711   shift @freeslot;
712   $proc{$childpid} = { jobstep => $id,
713                        time => time,
714                        slot => $childslot,
715                        jobstepname => "$job_id.$id.$childpid",
716                      };
717   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
718   $slot[$childslot]->{pid} = $childpid;
719
720   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
721   Log ($id, "child $childpid started on $childslotname");
722   $Jobstep->{starttime} = time;
723   $Jobstep->{node} = $childnode->{name};
724   $Jobstep->{slotindex} = $childslot;
725   delete $Jobstep->{stderr};
726   delete $Jobstep->{finishtime};
727
728   splice @jobstep_todo, $todo_ptr, 1;
729   --$todo_ptr;
730
731   $progress_is_dirty = 1;
732
733   while (!@freeslot
734          ||
735          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
736   {
737     last THISROUND if $main::please_freeze;
738     if ($main::please_info)
739     {
740       $main::please_info = 0;
741       freeze();
742       collate_output();
743       save_meta(1);
744       update_progress_stats();
745     }
746     my $gotsome
747         = readfrompipes ()
748         + reapchildren ();
749     if (!$gotsome)
750     {
751       check_refresh_wanted();
752       check_squeue();
753       update_progress_stats();
754       select (undef, undef, undef, 0.1);
755     }
756     elsif (time - $progress_stats_updated >= 30)
757     {
758       update_progress_stats();
759     }
760     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
761         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
762     {
763       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
764           .($thisround_failed+$thisround_succeeded)
765           .") -- giving up on this round";
766       Log (undef, $message);
767       last THISROUND;
768     }
769
770     # move slots from freeslot to holdslot (or back to freeslot) if necessary
771     for (my $i=$#freeslot; $i>=0; $i--) {
772       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
773         push @holdslot, (splice @freeslot, $i, 1);
774       }
775     }
776     for (my $i=$#holdslot; $i>=0; $i--) {
777       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
778         push @freeslot, (splice @holdslot, $i, 1);
779       }
780     }
781
782     # give up if no nodes are succeeding
783     if (!grep { $_->{node}->{losing_streak} == 0 &&
784                     $_->{node}->{hold_count} < 4 } @slot) {
785       my $message = "Every node has failed -- giving up on this round";
786       Log (undef, $message);
787       last THISROUND;
788     }
789   }
790 }
791
792
793 push @freeslot, splice @holdslot;
794 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
795
796
797 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
798 while (%proc)
799 {
800   if ($main::please_continue) {
801     $main::please_continue = 0;
802     goto THISROUND;
803   }
804   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
805   readfrompipes ();
806   if (!reapchildren())
807   {
808     check_refresh_wanted();
809     check_squeue();
810     update_progress_stats();
811     select (undef, undef, undef, 0.1);
812     killem (keys %proc) if $main::please_freeze;
813   }
814 }
815
816 update_progress_stats();
817 freeze_if_want_freeze();
818
819
820 if (!defined $main::success)
821 {
822   if (@jobstep_todo &&
823       $thisround_succeeded == 0 &&
824       ($thisround_failed == 0 || $thisround_failed > 4))
825   {
826     my $message = "stop because $thisround_failed tasks failed and none succeeded";
827     Log (undef, $message);
828     $main::success = 0;
829   }
830   if (!@jobstep_todo)
831   {
832     $main::success = 1;
833   }
834 }
835
836 goto ONELEVEL if !defined $main::success;
837
838
839 release_allocation();
840 freeze();
841 my $collated_output = &collate_output();
842
843 if ($job_has_uuid) {
844   $Job->update_attributes('running' => 0,
845                           'success' => $collated_output && $main::success,
846                           'finished_at' => scalar gmtime)
847 }
848
849 if (!$collated_output) {
850   Log(undef, "output undef");
851 }
852 else {
853   eval {
854     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
855         or die "failed to get collated manifest: $!";
856     my $orig_manifest_text = '';
857     while (my $manifest_line = <$orig_manifest>) {
858       $orig_manifest_text .= $manifest_line;
859     }
860     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
861       'manifest_text' => $orig_manifest_text,
862     });
863     Log(undef, "output uuid " . $output->{uuid});
864     Log(undef, "output hash " . $output->{portable_data_hash});
865     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
866   };
867   if ($@) {
868     Log (undef, "Failed to register output manifest: $@");
869   }
870 }
871
872 Log (undef, "finish");
873
874 save_meta();
875 exit ($Job->{'success'} ? 1 : 0);
876
877
878
879 sub update_progress_stats
880 {
881   $progress_stats_updated = time;
882   return if !$progress_is_dirty;
883   my ($todo, $done, $running) = (scalar @jobstep_todo,
884                                  scalar @jobstep_done,
885                                  scalar @slot - scalar @freeslot - scalar @holdslot);
886   $Job->{'tasks_summary'} ||= {};
887   $Job->{'tasks_summary'}->{'todo'} = $todo;
888   $Job->{'tasks_summary'}->{'done'} = $done;
889   $Job->{'tasks_summary'}->{'running'} = $running;
890   if ($job_has_uuid) {
891     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
892   }
893   Log (undef, "status: $done done, $running running, $todo todo");
894   $progress_is_dirty = 0;
895 }
896
897
898
899 sub reapchildren
900 {
901   my $pid = waitpid (-1, WNOHANG);
902   return 0 if $pid <= 0;
903
904   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
905                   . "."
906                   . $slot[$proc{$pid}->{slot}]->{cpu});
907   my $jobstepid = $proc{$pid}->{jobstep};
908   my $elapsed = time - $proc{$pid}->{time};
909   my $Jobstep = $jobstep[$jobstepid];
910
911   my $childstatus = $?;
912   my $exitvalue = $childstatus >> 8;
913   my $exitinfo = sprintf("exit %d signal %d%s",
914                          $exitvalue,
915                          $childstatus & 127,
916                          ($childstatus & 128 ? ' core dump' : ''));
917   $Jobstep->{'arvados_task'}->reload;
918   my $task_success = $Jobstep->{'arvados_task'}->{success};
919
920   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
921
922   if (!defined $task_success) {
923     # task did not indicate one way or the other --> fail
924     $Jobstep->{'arvados_task'}->{success} = 0;
925     $Jobstep->{'arvados_task'}->save;
926     $task_success = 0;
927   }
928
929   if (!$task_success)
930   {
931     my $temporary_fail;
932     $temporary_fail ||= $Jobstep->{node_fail};
933     $temporary_fail ||= ($exitvalue == 111);
934
935     ++$thisround_failed;
936     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
937
938     # Check for signs of a failed or misconfigured node
939     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
940         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
941       # Don't count this against jobstep failure thresholds if this
942       # node is already suspected faulty and srun exited quickly
943       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
944           $elapsed < 5) {
945         Log ($jobstepid, "blaming failure on suspect node " .
946              $slot[$proc{$pid}->{slot}]->{node}->{name});
947         $temporary_fail ||= 1;
948       }
949       ban_node_by_slot($proc{$pid}->{slot});
950     }
951
952     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
953                              ++$Jobstep->{'failures'},
954                              $temporary_fail ? 'temporary ' : 'permanent',
955                              $elapsed));
956
957     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
958       # Give up on this task, and the whole job
959       $main::success = 0;
960       $main::please_freeze = 1;
961     }
962     else {
963       # Put this task back on the todo queue
964       push @jobstep_todo, $jobstepid;
965     }
966     $Job->{'tasks_summary'}->{'failed'}++;
967   }
968   else
969   {
970     ++$thisround_succeeded;
971     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
972     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
973     push @jobstep_done, $jobstepid;
974     Log ($jobstepid, "success in $elapsed seconds");
975   }
976   $Jobstep->{exitcode} = $childstatus;
977   $Jobstep->{finishtime} = time;
978   process_stderr ($jobstepid, $task_success);
979   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
980
981   close $reader{$jobstepid};
982   delete $reader{$jobstepid};
983   delete $slot[$proc{$pid}->{slot}]->{pid};
984   push @freeslot, $proc{$pid}->{slot};
985   delete $proc{$pid};
986
987   if ($task_success) {
988     # Load new tasks
989     my $newtask_list = [];
990     my $newtask_results;
991     do {
992       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
993         'where' => {
994           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
995         },
996         'order' => 'qsequence',
997         'offset' => scalar(@$newtask_list),
998       );
999       push(@$newtask_list, @{$newtask_results->{items}});
1000     } while (@{$newtask_results->{items}});
1001     foreach my $arvados_task (@$newtask_list) {
1002       my $jobstep = {
1003         'level' => $arvados_task->{'sequence'},
1004         'failures' => 0,
1005         'arvados_task' => $arvados_task
1006       };
1007       push @jobstep, $jobstep;
1008       push @jobstep_todo, $#jobstep;
1009     }
1010   }
1011
1012   $progress_is_dirty = 1;
1013   1;
1014 }
1015
1016 sub check_refresh_wanted
1017 {
1018   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1019   if (@stat && $stat[9] > $latest_refresh) {
1020     $latest_refresh = scalar time;
1021     if ($job_has_uuid) {
1022       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1023       for my $attr ('cancelled_at',
1024                     'cancelled_by_user_uuid',
1025                     'cancelled_by_client_uuid') {
1026         $Job->{$attr} = $Job2->{$attr};
1027       }
1028       if ($Job->{'cancelled_at'}) {
1029         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1030              " by user " . $Job->{cancelled_by_user_uuid});
1031         $main::success = 0;
1032         $main::please_freeze = 1;
1033       }
1034     }
1035   }
1036 }
1037
1038 sub check_squeue
1039 {
1040   # return if the kill list was checked <4 seconds ago
1041   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1042   {
1043     return;
1044   }
1045   $squeue_kill_checked = time;
1046
1047   # use killem() on procs whose killtime is reached
1048   for (keys %proc)
1049   {
1050     if (exists $proc{$_}->{killtime}
1051         && $proc{$_}->{killtime} <= time)
1052     {
1053       killem ($_);
1054     }
1055   }
1056
1057   # return if the squeue was checked <60 seconds ago
1058   if (defined $squeue_checked && $squeue_checked > time - 60)
1059   {
1060     return;
1061   }
1062   $squeue_checked = time;
1063
1064   if (!$have_slurm)
1065   {
1066     # here is an opportunity to check for mysterious problems with local procs
1067     return;
1068   }
1069
1070   # get a list of steps still running
1071   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1072   chop @squeue;
1073   if ($squeue[-1] ne "ok")
1074   {
1075     return;
1076   }
1077   pop @squeue;
1078
1079   # which of my jobsteps are running, according to squeue?
1080   my %ok;
1081   foreach (@squeue)
1082   {
1083     if (/^(\d+)\.(\d+) (\S+)/)
1084     {
1085       if ($1 eq $ENV{SLURM_JOBID})
1086       {
1087         $ok{$3} = 1;
1088       }
1089     }
1090   }
1091
1092   # which of my active child procs (>60s old) were not mentioned by squeue?
1093   foreach (keys %proc)
1094   {
1095     if ($proc{$_}->{time} < time - 60
1096         && !exists $ok{$proc{$_}->{jobstepname}}
1097         && !exists $proc{$_}->{killtime})
1098     {
1099       # kill this proc if it hasn't exited in 30 seconds
1100       $proc{$_}->{killtime} = time + 30;
1101     }
1102   }
1103 }
1104
1105
1106 sub release_allocation
1107 {
1108   if ($have_slurm)
1109   {
1110     Log (undef, "release job allocation");
1111     system "scancel $ENV{SLURM_JOBID}";
1112   }
1113 }
1114
1115
1116 sub readfrompipes
1117 {
1118   my $gotsome = 0;
1119   foreach my $job (keys %reader)
1120   {
1121     my $buf;
1122     while (0 < sysread ($reader{$job}, $buf, 8192))
1123     {
1124       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1125       $jobstep[$job]->{stderr} .= $buf;
1126       preprocess_stderr ($job);
1127       if (length ($jobstep[$job]->{stderr}) > 16384)
1128       {
1129         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1130       }
1131       $gotsome = 1;
1132     }
1133   }
1134   return $gotsome;
1135 }
1136
1137
1138 sub preprocess_stderr
1139 {
1140   my $job = shift;
1141
1142   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1143     my $line = $1;
1144     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1145     Log ($job, "stderr $line");
1146     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1147       # whoa.
1148       $main::please_freeze = 1;
1149     }
1150     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1151       $jobstep[$job]->{node_fail} = 1;
1152       ban_node_by_slot($jobstep[$job]->{slotindex});
1153     }
1154   }
1155 }
1156
1157
1158 sub process_stderr
1159 {
1160   my $job = shift;
1161   my $task_success = shift;
1162   preprocess_stderr ($job);
1163
1164   map {
1165     Log ($job, "stderr $_");
1166   } split ("\n", $jobstep[$job]->{stderr});
1167 }
1168
1169 sub fetch_block
1170 {
1171   my $hash = shift;
1172   my ($keep, $child_out, $output_block);
1173
1174   my $cmd = "arv-get \Q$hash\E";
1175   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1176   $output_block = '';
1177   while (1) {
1178     my $buf;
1179     my $bytes = sysread($keep, $buf, 1024 * 1024);
1180     if (!defined $bytes) {
1181       die "reading from arv-get: $!";
1182     } elsif ($bytes == 0) {
1183       # sysread returns 0 at the end of the pipe.
1184       last;
1185     } else {
1186       # some bytes were read into buf.
1187       $output_block .= $buf;
1188     }
1189   }
1190   close $keep;
1191   return $output_block;
1192 }
1193
1194 sub collate_output
1195 {
1196   Log (undef, "collate");
1197
1198   my ($child_out, $child_in);
1199   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1200   my $joboutput;
1201   for (@jobstep)
1202   {
1203     next if (!exists $_->{'arvados_task'}->{'output'} ||
1204              !$_->{'arvados_task'}->{'success'});
1205     my $output = $_->{'arvados_task'}->{output};
1206     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1207     {
1208       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1209       print $child_in $output;
1210     }
1211     elsif (@jobstep == 1)
1212     {
1213       $joboutput = $output;
1214       last;
1215     }
1216     elsif (defined (my $outblock = fetch_block ($output)))
1217     {
1218       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1219       print $child_in $outblock;
1220     }
1221     else
1222     {
1223       Log (undef, "XXX fetch_block($output) failed XXX");
1224       $main::success = 0;
1225     }
1226   }
1227   $child_in->close;
1228
1229   if (!defined $joboutput) {
1230     my $s = IO::Select->new($child_out);
1231     if ($s->can_read(120)) {
1232       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1233       chomp($joboutput);
1234     } else {
1235       Log (undef, "timed out reading from 'arv-put'");
1236     }
1237   }
1238   waitpid($pid, 0);
1239
1240   return $joboutput;
1241 }
1242
1243
1244 sub killem
1245 {
1246   foreach (@_)
1247   {
1248     my $sig = 2;                # SIGINT first
1249     if (exists $proc{$_}->{"sent_$sig"} &&
1250         time - $proc{$_}->{"sent_$sig"} > 4)
1251     {
1252       $sig = 15;                # SIGTERM if SIGINT doesn't work
1253     }
1254     if (exists $proc{$_}->{"sent_$sig"} &&
1255         time - $proc{$_}->{"sent_$sig"} > 4)
1256     {
1257       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1258     }
1259     if (!exists $proc{$_}->{"sent_$sig"})
1260     {
1261       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1262       kill $sig, $_;
1263       select (undef, undef, undef, 0.1);
1264       if ($sig == 2)
1265       {
1266         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1267       }
1268       $proc{$_}->{"sent_$sig"} = time;
1269       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1270     }
1271   }
1272 }
1273
1274
1275 sub fhbits
1276 {
1277   my($bits);
1278   for (@_) {
1279     vec($bits,fileno($_),1) = 1;
1280   }
1281   $bits;
1282 }
1283
1284
1285 sub Log                         # ($jobstep_id, $logmessage)
1286 {
1287   if ($_[1] =~ /\n/) {
1288     for my $line (split (/\n/, $_[1])) {
1289       Log ($_[0], $line);
1290     }
1291     return;
1292   }
1293   my $fh = select STDERR; $|=1; select $fh;
1294   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1295   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1296   $message .= "\n";
1297   my $datetime;
1298   if ($local_logfile || -t STDERR) {
1299     my @gmtime = gmtime;
1300     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1301                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1302   }
1303   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1304
1305   if ($local_logfile) {
1306     print $local_logfile $datetime . " " . $message;
1307   }
1308 }
1309
1310
1311 sub croak
1312 {
1313   my ($package, $file, $line) = caller;
1314   my $message = "@_ at $file line $line\n";
1315   Log (undef, $message);
1316   freeze() if @jobstep_todo;
1317   collate_output() if @jobstep_todo;
1318   cleanup();
1319   save_meta() if $local_logfile;
1320   die;
1321 }
1322
1323
1324 sub cleanup
1325 {
1326   return if !$job_has_uuid;
1327   $Job->update_attributes('running' => 0,
1328                           'success' => 0,
1329                           'finished_at' => scalar gmtime);
1330 }
1331
1332
1333 sub save_meta
1334 {
1335   my $justcheckpoint = shift; # false if this will be the last meta saved
1336   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1337
1338   $local_logfile->flush;
1339   my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
1340       . quotemeta($local_logfile->filename);
1341   my $loglocator = `$cmd`;
1342   die "system $cmd failed: $?" if $?;
1343   chomp($loglocator);
1344
1345   $local_logfile = undef;   # the temp file is automatically deleted
1346   Log (undef, "log manifest is $loglocator");
1347   $Job->{'log'} = $loglocator;
1348   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1349 }
1350
1351
1352 sub freeze_if_want_freeze
1353 {
1354   if ($main::please_freeze)
1355   {
1356     release_allocation();
1357     if (@_)
1358     {
1359       # kill some srun procs before freeze+stop
1360       map { $proc{$_} = {} } @_;
1361       while (%proc)
1362       {
1363         killem (keys %proc);
1364         select (undef, undef, undef, 0.1);
1365         my $died;
1366         while (($died = waitpid (-1, WNOHANG)) > 0)
1367         {
1368           delete $proc{$died};
1369         }
1370       }
1371     }
1372     freeze();
1373     collate_output();
1374     cleanup();
1375     save_meta();
1376     exit 0;
1377   }
1378 }
1379
1380
1381 sub freeze
1382 {
1383   Log (undef, "Freeze not implemented");
1384   return;
1385 }
1386
1387
1388 sub thaw
1389 {
1390   croak ("Thaw not implemented");
1391 }
1392
1393
1394 sub freezequote
1395 {
1396   my $s = shift;
1397   $s =~ s/\\/\\\\/g;
1398   $s =~ s/\n/\\n/g;
1399   return $s;
1400 }
1401
1402
1403 sub freezeunquote
1404 {
1405   my $s = shift;
1406   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1407   return $s;
1408 }
1409
1410
1411 sub srun
1412 {
1413   my $srunargs = shift;
1414   my $execargs = shift;
1415   my $opts = shift || {};
1416   my $stdin = shift;
1417   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1418   print STDERR (join (" ",
1419                       map { / / ? "'$_'" : $_ }
1420                       (@$args)),
1421                 "\n")
1422       if $ENV{CRUNCH_DEBUG};
1423
1424   if (defined $stdin) {
1425     my $child = open STDIN, "-|";
1426     defined $child or die "no fork: $!";
1427     if ($child == 0) {
1428       print $stdin or die $!;
1429       close STDOUT or die $!;
1430       exit 0;
1431     }
1432   }
1433
1434   return system (@$args) if $opts->{fork};
1435
1436   exec @$args;
1437   warn "ENV size is ".length(join(" ",%ENV));
1438   die "exec failed: $!: @$args";
1439 }
1440
1441
1442 sub ban_node_by_slot {
1443   # Don't start any new jobsteps on this node for 60 seconds
1444   my $slotid = shift;
1445   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1446   $slot[$slotid]->{node}->{hold_count}++;
1447   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1448 }
1449
1450 sub must_lock_now
1451 {
1452   my ($lockfile, $error_message) = @_;
1453   open L, ">", $lockfile or croak("$lockfile: $!");
1454   if (!flock L, LOCK_EX|LOCK_NB) {
1455     croak("Can't lock $lockfile: $error_message\n");
1456   }
1457 }
1458
1459 sub find_docker_image {
1460   # Given a Keep locator, check to see if it contains a Docker image.
1461   # If so, return its stream name and Docker hash.
1462   # If not, return undef for both values.
1463   my $locator = shift;
1464   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1465     my @file_list = @{$image->{files}};
1466     if ((scalar(@file_list) == 1) &&
1467         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1468       return ($file_list[0][0], $1);
1469     }
1470   }
1471   return (undef, undef);
1472 }
1473
1474 __DATA__
1475 #!/usr/bin/perl
1476
1477 # checkout-and-build
1478
1479 use Fcntl ':flock';
1480 use File::Path qw( make_path );
1481
1482 my $destdir = $ENV{"CRUNCH_SRC"};
1483 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1484 my $repo = $ENV{"CRUNCH_SRC_URL"};
1485 my $task_work = $ENV{"TASK_WORK"};
1486
1487 for my $dir ($destdir, $task_work) {
1488     if ($dir) {
1489         make_path $dir;
1490         -e $dir or die "Failed to create temporary directory ($dir): $!";
1491     }
1492 }
1493
1494 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1495 flock L, LOCK_EX;
1496 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1497     if (@ARGV) {
1498         exec(@ARGV);
1499         die "Cannot exec `@ARGV`: $!";
1500     } else {
1501         exit 0;
1502     }
1503 }
1504
1505 unlink "$destdir.commit";
1506 open STDOUT, ">", "$destdir.log";
1507 open STDERR, ">&STDOUT";
1508
1509 mkdir $destdir;
1510 my @git_archive_data = <DATA>;
1511 if (@git_archive_data) {
1512   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1513   print TARX @git_archive_data;
1514   if(!close(TARX)) {
1515     die "'tar -C $destdir -xf -' exited $?: $!";
1516   }
1517 }
1518
1519 my $pwd;
1520 chomp ($pwd = `pwd`);
1521 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1522 mkdir $install_dir;
1523
1524 for my $src_path ("$destdir/arvados/sdk/python") {
1525   if (-d $src_path) {
1526     shell_or_die ("virtualenv", $install_dir);
1527     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1528   }
1529 }
1530
1531 if (-e "$destdir/crunch_scripts/install") {
1532     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1533 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1534     # Old version
1535     shell_or_die ("./tests/autotests.sh", $install_dir);
1536 } elsif (-e "./install.sh") {
1537     shell_or_die ("./install.sh", $install_dir);
1538 }
1539
1540 if ($commit) {
1541     unlink "$destdir.commit.new";
1542     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1543     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1544 }
1545
1546 close L;
1547
1548 if (@ARGV) {
1549     exec(@ARGV);
1550     die "Cannot exec `@ARGV`: $!";
1551 } else {
1552     exit 0;
1553 }
1554
1555 sub shell_or_die
1556 {
1557   if ($ENV{"DEBUG"}) {
1558     print STDERR "@_\n";
1559   }
1560   system (@_) == 0
1561       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1562 }
1563
1564 __DATA__