]> git.arvados.org - arvados.git/blob - sdk/cli/bin/crunch-job
3342: Fix syntax error in pipeline_instance.rb. Restore code in crunch-job
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   my $installpid = fork();
491   if ($installpid == 0)
492   {
493     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
494     exit (1);
495   }
496   while (1)
497   {
498     last if $installpid == waitpid (-1, WNOHANG);
499     freeze_if_want_freeze ($installpid);
500     select (undef, undef, undef, 0.1);
501   }
502   Log (undef, "Install exited $?");
503 }
504
505 if (!$have_slurm)
506 {
507   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 }
510
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516   if (!$docker_hash)
517   {
518     croak("No Docker image hash found from locator $docker_locator");
519   }
520   $docker_stream =~ s/^\.//;
521   my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 fi
525 };
526   my $docker_pid = fork();
527   if ($docker_pid == 0)
528   {
529     srun (["srun", "--nodelist=" . join(',', @node)],
530           ["/bin/sh", "-ec", $docker_install_script]);
531     exit ($?);
532   }
533   while (1)
534   {
535     last if $docker_pid == waitpid (-1, WNOHANG);
536     freeze_if_want_freeze ($docker_pid);
537     select (undef, undef, undef, 0.1);
538   }
539   if ($? != 0)
540   {
541     croak("Installing Docker image from $docker_locator returned exit code $?");
542   }
543 }
544
545 foreach (qw (script script_version script_parameters runtime_constraints))
546 {
547   Log (undef,
548        "$_ " .
549        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
550 }
551 foreach (split (/\n/, $Job->{knobs}))
552 {
553   Log (undef, "knob " . $_);
554 }
555
556
557
558 $main::success = undef;
559
560
561
562 ONELEVEL:
563
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
567
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569                        or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
572
573
574
575 my %proc;
576 my @freeslot = (0..$#slot);
577 my @holdslot;
578 my %reader;
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
581
582 update_progress_stats();
583
584
585
586 THISROUND:
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
588 {
589   my $id = $jobstep_todo[$todo_ptr];
590   my $Jobstep = $jobstep[$id];
591   if ($Jobstep->{level} != $level)
592   {
593     next;
594   }
595
596   pipe $reader{$id}, "writer" or croak ($!);
597   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
599
600   my $childslot = $freeslot[0];
601   my $childnode = $slot[$childslot]->{node};
602   my $childslotname = join (".",
603                             $slot[$childslot]->{node}->{name},
604                             $slot[$childslot]->{cpu});
605   my $childpid = fork();
606   if ($childpid == 0)
607   {
608     $SIG{'INT'} = 'DEFAULT';
609     $SIG{'QUIT'} = 'DEFAULT';
610     $SIG{'TERM'} = 'DEFAULT';
611
612     foreach (values (%reader))
613     {
614       close($_);
615     }
616     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617     open(STDOUT,">&writer");
618     open(STDERR,">&writer");
619
620     undef $dbh;
621     undef $sth;
622
623     delete $ENV{"GNUPGHOME"};
624     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625     $ENV{"TASK_QSEQUENCE"} = $id;
626     $ENV{"TASK_SEQUENCE"} = $level;
627     $ENV{"JOB_SCRIPT"} = $Job->{script};
628     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629       $param =~ tr/a-z/A-Z/;
630       $ENV{"JOB_PARAMETER_$param"} = $value;
631     }
632     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635     $ENV{"HOME"} = $ENV{"TASK_WORK"};
636     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
640
641     $ENV{"GZIP"} = "-n";
642
643     my @srunargs = (
644       "srun",
645       "--nodelist=".$childnode->{name},
646       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647       "--job-name=$job_id.$id.$$",
648         );
649     my $build_script_to_send = "";
650     my $command =
651         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
653         ."&& cd $ENV{CRUNCH_TMP} ";
654     if ($build_script)
655     {
656       $build_script_to_send = $build_script;
657       $command .=
658           "&& perl -";
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673       $command .= "--env=\QHOME=/home/crunch\E ";
674       while (my ($env_key, $env_val) = each %ENV)
675       {
676         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677           if ($env_key eq "TASK_KEEPMOUNT") {
678             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
679           }
680           else {
681             $command .= "--env=\Q$env_key=$env_val\E ";
682           }
683         }
684       }
685       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
686       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
687       $command .= "\Q$docker_hash\E ";
688       $command .= "stdbuf --output=0 --error=0 ";
689       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
690     } else {
691       # Non-docker run
692       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
695     }
696
697     my @execargs = ('bash', '-c', $command);
698     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
699     # exec() failed, we assume nothing happened.
700     Log(undef, "srun() failed on build script");
701     die;
702   }
703   close("writer");
704   if (!defined $childpid)
705   {
706     close $reader{$id};
707     delete $reader{$id};
708     next;
709   }
710   shift @freeslot;
711   $proc{$childpid} = { jobstep => $id,
712                        time => time,
713                        slot => $childslot,
714                        jobstepname => "$job_id.$id.$childpid",
715                      };
716   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
717   $slot[$childslot]->{pid} = $childpid;
718
719   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
720   Log ($id, "child $childpid started on $childslotname");
721   $Jobstep->{starttime} = time;
722   $Jobstep->{node} = $childnode->{name};
723   $Jobstep->{slotindex} = $childslot;
724   delete $Jobstep->{stderr};
725   delete $Jobstep->{finishtime};
726
727   splice @jobstep_todo, $todo_ptr, 1;
728   --$todo_ptr;
729
730   $progress_is_dirty = 1;
731
732   while (!@freeslot
733          ||
734          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
735   {
736     last THISROUND if $main::please_freeze;
737     if ($main::please_info)
738     {
739       $main::please_info = 0;
740       freeze();
741       collate_output();
742       save_meta(1);
743       update_progress_stats();
744     }
745     my $gotsome
746         = readfrompipes ()
747         + reapchildren ();
748     if (!$gotsome)
749     {
750       check_refresh_wanted();
751       check_squeue();
752       update_progress_stats();
753       select (undef, undef, undef, 0.1);
754     }
755     elsif (time - $progress_stats_updated >= 30)
756     {
757       update_progress_stats();
758     }
759     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
760         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
761     {
762       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
763           .($thisround_failed+$thisround_succeeded)
764           .") -- giving up on this round";
765       Log (undef, $message);
766       last THISROUND;
767     }
768
769     # move slots from freeslot to holdslot (or back to freeslot) if necessary
770     for (my $i=$#freeslot; $i>=0; $i--) {
771       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
772         push @holdslot, (splice @freeslot, $i, 1);
773       }
774     }
775     for (my $i=$#holdslot; $i>=0; $i--) {
776       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
777         push @freeslot, (splice @holdslot, $i, 1);
778       }
779     }
780
781     # give up if no nodes are succeeding
782     if (!grep { $_->{node}->{losing_streak} == 0 &&
783                     $_->{node}->{hold_count} < 4 } @slot) {
784       my $message = "Every node has failed -- giving up on this round";
785       Log (undef, $message);
786       last THISROUND;
787     }
788   }
789 }
790
791
792 push @freeslot, splice @holdslot;
793 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
794
795
796 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
797 while (%proc)
798 {
799   if ($main::please_continue) {
800     $main::please_continue = 0;
801     goto THISROUND;
802   }
803   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
804   readfrompipes ();
805   if (!reapchildren())
806   {
807     check_refresh_wanted();
808     check_squeue();
809     update_progress_stats();
810     select (undef, undef, undef, 0.1);
811     killem (keys %proc) if $main::please_freeze;
812   }
813 }
814
815 update_progress_stats();
816 freeze_if_want_freeze();
817
818
819 if (!defined $main::success)
820 {
821   if (@jobstep_todo &&
822       $thisround_succeeded == 0 &&
823       ($thisround_failed == 0 || $thisround_failed > 4))
824   {
825     my $message = "stop because $thisround_failed tasks failed and none succeeded";
826     Log (undef, $message);
827     $main::success = 0;
828   }
829   if (!@jobstep_todo)
830   {
831     $main::success = 1;
832   }
833 }
834
835 goto ONELEVEL if !defined $main::success;
836
837
838 release_allocation();
839 freeze();
840 my $collated_output = &collate_output();
841
842 if ($job_has_uuid) {
843   $Job->update_attributes('running' => 0,
844                           'success' => $collated_output && $main::success,
845                           'finished_at' => scalar gmtime)
846 }
847
848 if (!$collated_output) {
849   Log(undef, "output undef");
850 }
851 else {
852   eval {
853     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
854         or die "failed to get collated manifest: $!";
855     my $orig_manifest_text = '';
856     while (my $manifest_line = <$orig_manifest>) {
857       $orig_manifest_text .= $manifest_line;
858     }
859     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
860       'manifest_text' => $orig_manifest_text,
861     });
862     Log(undef, "output " . $output->{portable_data_hash});
863     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
864   };
865   if ($@) {
866     Log (undef, "Failed to register output manifest: $@");
867   }
868 }
869
870 Log (undef, "finish");
871
872 save_meta();
873 exit ($Job->{'success'} ? 1 : 0);
874
875
876
877 sub update_progress_stats
878 {
879   $progress_stats_updated = time;
880   return if !$progress_is_dirty;
881   my ($todo, $done, $running) = (scalar @jobstep_todo,
882                                  scalar @jobstep_done,
883                                  scalar @slot - scalar @freeslot - scalar @holdslot);
884   $Job->{'tasks_summary'} ||= {};
885   $Job->{'tasks_summary'}->{'todo'} = $todo;
886   $Job->{'tasks_summary'}->{'done'} = $done;
887   $Job->{'tasks_summary'}->{'running'} = $running;
888   if ($job_has_uuid) {
889     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
890   }
891   Log (undef, "status: $done done, $running running, $todo todo");
892   $progress_is_dirty = 0;
893 }
894
895
896
897 sub reapchildren
898 {
899   my $pid = waitpid (-1, WNOHANG);
900   return 0 if $pid <= 0;
901
902   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
903                   . "."
904                   . $slot[$proc{$pid}->{slot}]->{cpu});
905   my $jobstepid = $proc{$pid}->{jobstep};
906   my $elapsed = time - $proc{$pid}->{time};
907   my $Jobstep = $jobstep[$jobstepid];
908
909   my $childstatus = $?;
910   my $exitvalue = $childstatus >> 8;
911   my $exitinfo = sprintf("exit %d signal %d%s",
912                          $exitvalue,
913                          $childstatus & 127,
914                          ($childstatus & 128 ? ' core dump' : ''));
915   $Jobstep->{'arvados_task'}->reload;
916   my $task_success = $Jobstep->{'arvados_task'}->{success};
917
918   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
919
920   if (!defined $task_success) {
921     # task did not indicate one way or the other --> fail
922     $Jobstep->{'arvados_task'}->{success} = 0;
923     $Jobstep->{'arvados_task'}->save;
924     $task_success = 0;
925   }
926
927   if (!$task_success)
928   {
929     my $temporary_fail;
930     $temporary_fail ||= $Jobstep->{node_fail};
931     $temporary_fail ||= ($exitvalue == 111);
932
933     ++$thisround_failed;
934     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
935
936     # Check for signs of a failed or misconfigured node
937     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
938         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
939       # Don't count this against jobstep failure thresholds if this
940       # node is already suspected faulty and srun exited quickly
941       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
942           $elapsed < 5) {
943         Log ($jobstepid, "blaming failure on suspect node " .
944              $slot[$proc{$pid}->{slot}]->{node}->{name});
945         $temporary_fail ||= 1;
946       }
947       ban_node_by_slot($proc{$pid}->{slot});
948     }
949
950     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
951                              ++$Jobstep->{'failures'},
952                              $temporary_fail ? 'temporary ' : 'permanent',
953                              $elapsed));
954
955     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
956       # Give up on this task, and the whole job
957       $main::success = 0;
958       $main::please_freeze = 1;
959     }
960     else {
961       # Put this task back on the todo queue
962       push @jobstep_todo, $jobstepid;
963     }
964     $Job->{'tasks_summary'}->{'failed'}++;
965   }
966   else
967   {
968     ++$thisround_succeeded;
969     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
970     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
971     push @jobstep_done, $jobstepid;
972     Log ($jobstepid, "success in $elapsed seconds");
973   }
974   $Jobstep->{exitcode} = $childstatus;
975   $Jobstep->{finishtime} = time;
976   process_stderr ($jobstepid, $task_success);
977   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
978
979   close $reader{$jobstepid};
980   delete $reader{$jobstepid};
981   delete $slot[$proc{$pid}->{slot}]->{pid};
982   push @freeslot, $proc{$pid}->{slot};
983   delete $proc{$pid};
984
985   if ($task_success) {
986     # Load new tasks
987     my $newtask_list = [];
988     my $newtask_results;
989     do {
990       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
991         'where' => {
992           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
993         },
994         'order' => 'qsequence',
995         'offset' => scalar(@$newtask_list),
996       );
997       push(@$newtask_list, @{$newtask_results->{items}});
998     } while (@{$newtask_results->{items}});
999     foreach my $arvados_task (@$newtask_list) {
1000       my $jobstep = {
1001         'level' => $arvados_task->{'sequence'},
1002         'failures' => 0,
1003         'arvados_task' => $arvados_task
1004       };
1005       push @jobstep, $jobstep;
1006       push @jobstep_todo, $#jobstep;
1007     }
1008   }
1009
1010   $progress_is_dirty = 1;
1011   1;
1012 }
1013
1014 sub check_refresh_wanted
1015 {
1016   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1017   if (@stat && $stat[9] > $latest_refresh) {
1018     $latest_refresh = scalar time;
1019     if ($job_has_uuid) {
1020       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1021       for my $attr ('cancelled_at',
1022                     'cancelled_by_user_uuid',
1023                     'cancelled_by_client_uuid') {
1024         $Job->{$attr} = $Job2->{$attr};
1025       }
1026       if ($Job->{'cancelled_at'}) {
1027         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1028              " by user " . $Job->{cancelled_by_user_uuid});
1029         $main::success = 0;
1030         $main::please_freeze = 1;
1031       }
1032     }
1033   }
1034 }
1035
1036 sub check_squeue
1037 {
1038   # return if the kill list was checked <4 seconds ago
1039   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1040   {
1041     return;
1042   }
1043   $squeue_kill_checked = time;
1044
1045   # use killem() on procs whose killtime is reached
1046   for (keys %proc)
1047   {
1048     if (exists $proc{$_}->{killtime}
1049         && $proc{$_}->{killtime} <= time)
1050     {
1051       killem ($_);
1052     }
1053   }
1054
1055   # return if the squeue was checked <60 seconds ago
1056   if (defined $squeue_checked && $squeue_checked > time - 60)
1057   {
1058     return;
1059   }
1060   $squeue_checked = time;
1061
1062   if (!$have_slurm)
1063   {
1064     # here is an opportunity to check for mysterious problems with local procs
1065     return;
1066   }
1067
1068   # get a list of steps still running
1069   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1070   chop @squeue;
1071   if ($squeue[-1] ne "ok")
1072   {
1073     return;
1074   }
1075   pop @squeue;
1076
1077   # which of my jobsteps are running, according to squeue?
1078   my %ok;
1079   foreach (@squeue)
1080   {
1081     if (/^(\d+)\.(\d+) (\S+)/)
1082     {
1083       if ($1 eq $ENV{SLURM_JOBID})
1084       {
1085         $ok{$3} = 1;
1086       }
1087     }
1088   }
1089
1090   # which of my active child procs (>60s old) were not mentioned by squeue?
1091   foreach (keys %proc)
1092   {
1093     if ($proc{$_}->{time} < time - 60
1094         && !exists $ok{$proc{$_}->{jobstepname}}
1095         && !exists $proc{$_}->{killtime})
1096     {
1097       # kill this proc if it hasn't exited in 30 seconds
1098       $proc{$_}->{killtime} = time + 30;
1099     }
1100   }
1101 }
1102
1103
1104 sub release_allocation
1105 {
1106   if ($have_slurm)
1107   {
1108     Log (undef, "release job allocation");
1109     system "scancel $ENV{SLURM_JOBID}";
1110   }
1111 }
1112
1113
1114 sub readfrompipes
1115 {
1116   my $gotsome = 0;
1117   foreach my $job (keys %reader)
1118   {
1119     my $buf;
1120     while (0 < sysread ($reader{$job}, $buf, 8192))
1121     {
1122       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1123       $jobstep[$job]->{stderr} .= $buf;
1124       preprocess_stderr ($job);
1125       if (length ($jobstep[$job]->{stderr}) > 16384)
1126       {
1127         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1128       }
1129       $gotsome = 1;
1130     }
1131   }
1132   return $gotsome;
1133 }
1134
1135
1136 sub preprocess_stderr
1137 {
1138   my $job = shift;
1139
1140   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1141     my $line = $1;
1142     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1143     Log ($job, "stderr $line");
1144     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1145       # whoa.
1146       $main::please_freeze = 1;
1147     }
1148     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1149       $jobstep[$job]->{node_fail} = 1;
1150       ban_node_by_slot($jobstep[$job]->{slotindex});
1151     }
1152   }
1153 }
1154
1155
1156 sub process_stderr
1157 {
1158   my $job = shift;
1159   my $task_success = shift;
1160   preprocess_stderr ($job);
1161
1162   map {
1163     Log ($job, "stderr $_");
1164   } split ("\n", $jobstep[$job]->{stderr});
1165 }
1166
1167 sub fetch_block
1168 {
1169   my $hash = shift;
1170   my ($keep, $child_out, $output_block);
1171
1172   my $cmd = "arv-get \Q$hash\E";
1173   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1174   $output_block = '';
1175   while (1) {
1176     my $buf;
1177     my $bytes = sysread($keep, $buf, 1024 * 1024);
1178     if (!defined $bytes) {
1179       die "reading from arv-get: $!";
1180     } elsif ($bytes == 0) {
1181       # sysread returns 0 at the end of the pipe.
1182       last;
1183     } else {
1184       # some bytes were read into buf.
1185       $output_block .= $buf;
1186     }
1187   }
1188   close $keep;
1189   return $output_block;
1190 }
1191
1192 sub collate_output
1193 {
1194   Log (undef, "collate");
1195
1196   my ($child_out, $child_in);
1197   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1198   my $joboutput;
1199   for (@jobstep)
1200   {
1201     next if (!exists $_->{'arvados_task'}->{'output'} ||
1202              !$_->{'arvados_task'}->{'success'});
1203     my $output = $_->{'arvados_task'}->{output};
1204     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1205     {
1206       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1207       print $child_in $output;
1208     }
1209     elsif (@jobstep == 1)
1210     {
1211       $joboutput = $output;
1212       last;
1213     }
1214     elsif (defined (my $outblock = fetch_block ($output)))
1215     {
1216       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1217       print $child_in $outblock;
1218     }
1219     else
1220     {
1221       Log (undef, "XXX fetch_block($output) failed XXX");
1222       $main::success = 0;
1223     }
1224   }
1225   $child_in->close;
1226
1227   if (!defined $joboutput) {
1228     my $s = IO::Select->new($child_out);
1229     if ($s->can_read(120)) {
1230       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1231       chomp($joboutput);
1232     } else {
1233       Log (undef, "timed out reading from 'arv-put'");
1234     }
1235   }
1236   waitpid($pid, 0);
1237
1238   return $joboutput;
1239 }
1240
1241
1242 sub killem
1243 {
1244   foreach (@_)
1245   {
1246     my $sig = 2;                # SIGINT first
1247     if (exists $proc{$_}->{"sent_$sig"} &&
1248         time - $proc{$_}->{"sent_$sig"} > 4)
1249     {
1250       $sig = 15;                # SIGTERM if SIGINT doesn't work
1251     }
1252     if (exists $proc{$_}->{"sent_$sig"} &&
1253         time - $proc{$_}->{"sent_$sig"} > 4)
1254     {
1255       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1256     }
1257     if (!exists $proc{$_}->{"sent_$sig"})
1258     {
1259       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1260       kill $sig, $_;
1261       select (undef, undef, undef, 0.1);
1262       if ($sig == 2)
1263       {
1264         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1265       }
1266       $proc{$_}->{"sent_$sig"} = time;
1267       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1268     }
1269   }
1270 }
1271
1272
1273 sub fhbits
1274 {
1275   my($bits);
1276   for (@_) {
1277     vec($bits,fileno($_),1) = 1;
1278   }
1279   $bits;
1280 }
1281
1282
1283 sub Log                         # ($jobstep_id, $logmessage)
1284 {
1285   if ($_[1] =~ /\n/) {
1286     for my $line (split (/\n/, $_[1])) {
1287       Log ($_[0], $line);
1288     }
1289     return;
1290   }
1291   my $fh = select STDERR; $|=1; select $fh;
1292   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1293   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1294   $message .= "\n";
1295   my $datetime;
1296   if ($local_logfile || -t STDERR) {
1297     my @gmtime = gmtime;
1298     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1299                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1300   }
1301   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1302
1303   if ($local_logfile) {
1304     print $local_logfile $datetime . " " . $message;
1305   }
1306 }
1307
1308
1309 sub croak
1310 {
1311   my ($package, $file, $line) = caller;
1312   my $message = "@_ at $file line $line\n";
1313   Log (undef, $message);
1314   freeze() if @jobstep_todo;
1315   collate_output() if @jobstep_todo;
1316   cleanup();
1317   save_meta() if $local_logfile;
1318   die;
1319 }
1320
1321
1322 sub cleanup
1323 {
1324   return if !$job_has_uuid;
1325   $Job->update_attributes('running' => 0,
1326                           'success' => 0,
1327                           'finished_at' => scalar gmtime);
1328 }
1329
1330
1331 sub save_meta
1332 {
1333   my $justcheckpoint = shift; # false if this will be the last meta saved
1334   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1335
1336   $local_logfile->flush;
1337   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1338       . quotemeta($local_logfile->filename);
1339   my $loglocator = `$cmd`;
1340   die "system $cmd failed: $?" if $?;
1341   chomp($loglocator);
1342
1343   $local_logfile = undef;   # the temp file is automatically deleted
1344   Log (undef, "log manifest is $loglocator");
1345   $Job->{'log'} = $loglocator;
1346   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1347 }
1348
1349
1350 sub freeze_if_want_freeze
1351 {
1352   if ($main::please_freeze)
1353   {
1354     release_allocation();
1355     if (@_)
1356     {
1357       # kill some srun procs before freeze+stop
1358       map { $proc{$_} = {} } @_;
1359       while (%proc)
1360       {
1361         killem (keys %proc);
1362         select (undef, undef, undef, 0.1);
1363         my $died;
1364         while (($died = waitpid (-1, WNOHANG)) > 0)
1365         {
1366           delete $proc{$died};
1367         }
1368       }
1369     }
1370     freeze();
1371     collate_output();
1372     cleanup();
1373     save_meta();
1374     exit 0;
1375   }
1376 }
1377
1378
1379 sub freeze
1380 {
1381   Log (undef, "Freeze not implemented");
1382   return;
1383 }
1384
1385
1386 sub thaw
1387 {
1388   croak ("Thaw not implemented");
1389 }
1390
1391
1392 sub freezequote
1393 {
1394   my $s = shift;
1395   $s =~ s/\\/\\\\/g;
1396   $s =~ s/\n/\\n/g;
1397   return $s;
1398 }
1399
1400
1401 sub freezeunquote
1402 {
1403   my $s = shift;
1404   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1405   return $s;
1406 }
1407
1408
1409 sub srun
1410 {
1411   my $srunargs = shift;
1412   my $execargs = shift;
1413   my $opts = shift || {};
1414   my $stdin = shift;
1415   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1416   print STDERR (join (" ",
1417                       map { / / ? "'$_'" : $_ }
1418                       (@$args)),
1419                 "\n")
1420       if $ENV{CRUNCH_DEBUG};
1421
1422   if (defined $stdin) {
1423     my $child = open STDIN, "-|";
1424     defined $child or die "no fork: $!";
1425     if ($child == 0) {
1426       print $stdin or die $!;
1427       close STDOUT or die $!;
1428       exit 0;
1429     }
1430   }
1431
1432   return system (@$args) if $opts->{fork};
1433
1434   exec @$args;
1435   warn "ENV size is ".length(join(" ",%ENV));
1436   die "exec failed: $!: @$args";
1437 }
1438
1439
1440 sub ban_node_by_slot {
1441   # Don't start any new jobsteps on this node for 60 seconds
1442   my $slotid = shift;
1443   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1444   $slot[$slotid]->{node}->{hold_count}++;
1445   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1446 }
1447
1448 sub must_lock_now
1449 {
1450   my ($lockfile, $error_message) = @_;
1451   open L, ">", $lockfile or croak("$lockfile: $!");
1452   if (!flock L, LOCK_EX|LOCK_NB) {
1453     croak("Can't lock $lockfile: $error_message\n");
1454   }
1455 }
1456
1457 sub find_docker_image {
1458   # Given a Keep locator, check to see if it contains a Docker image.
1459   # If so, return its stream name and Docker hash.
1460   # If not, return undef for both values.
1461   my $locator = shift;
1462   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1463     my @file_list = @{$image->{files}};
1464     if ((scalar(@file_list) == 1) &&
1465         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1466       return ($file_list[0][0], $1);
1467     }
1468   }
1469   return (undef, undef);
1470 }
1471
1472 __DATA__
1473 #!/usr/bin/perl
1474
1475 # checkout-and-build
1476
1477 use Fcntl ':flock';
1478 use File::Path qw( make_path );
1479
1480 my $destdir = $ENV{"CRUNCH_SRC"};
1481 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1482 my $repo = $ENV{"CRUNCH_SRC_URL"};
1483 my $task_work = $ENV{"TASK_WORK"};
1484
1485 if ($task_work) {
1486     make_path $task_work;
1487     -e $task_work or die "Failed to create temporary working directory ($task_work): $!";
1488 }
1489
1490 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1491 flock L, LOCK_EX;
1492 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1493     exit 0;
1494 }
1495
1496 unlink "$destdir.commit";
1497 open STDOUT, ">", "$destdir.log";
1498 open STDERR, ">&STDOUT";
1499
1500 mkdir $destdir;
1501 my @git_archive_data = <DATA>;
1502 if (@git_archive_data) {
1503   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1504   print TARX @git_archive_data;
1505   if(!close(TARX)) {
1506     die "'tar -C $destdir -xf -' exited $?: $!";
1507   }
1508 }
1509
1510 my $pwd;
1511 chomp ($pwd = `pwd`);
1512 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1513 mkdir $install_dir;
1514
1515 for my $src_path ("$destdir/arvados/sdk/python") {
1516   if (-d $src_path) {
1517     shell_or_die ("virtualenv", $install_dir);
1518     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1519   }
1520 }
1521
1522 if (-e "$destdir/crunch_scripts/install") {
1523     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1524 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1525     # Old version
1526     shell_or_die ("./tests/autotests.sh", $install_dir);
1527 } elsif (-e "./install.sh") {
1528     shell_or_die ("./install.sh", $install_dir);
1529 }
1530
1531 if ($commit) {
1532     unlink "$destdir.commit.new";
1533     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1534     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1535 }
1536
1537 close L;
1538
1539 exit 0;
1540
1541 sub shell_or_die
1542 {
1543   if ($ENV{"DEBUG"}) {
1544     print STDERR "@_\n";
1545   }
1546   system (@_) == 0
1547       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1548 }
1549
1550 __DATA__