Complain and fail if exec() returns. No issue #
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   # Note: this section is almost certainly unnecessary if we're
491   # running tasks in docker containers.
492   my $installpid = fork();
493   if ($installpid == 0)
494   {
495     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
496     exit (1);
497   }
498   while (1)
499   {
500     last if $installpid == waitpid (-1, WNOHANG);
501     freeze_if_want_freeze ($installpid);
502     select (undef, undef, undef, 0.1);
503   }
504   Log (undef, "Install exited $?");
505 }
506
507 if (!$have_slurm)
508 {
509   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
510   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
511 }
512
513 # If this job requires a Docker image, install that.
514 my $docker_bin = "/usr/bin/docker.io";
515 my ($docker_locator, $docker_stream, $docker_hash);
516 if ($docker_locator = $Job->{docker_image_locator}) {
517   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
518   if (!$docker_hash)
519   {
520     croak("No Docker image hash found from locator $docker_locator");
521   }
522   $docker_stream =~ s/^\.//;
523   my $docker_install_script = qq{
524 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
525     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
526 fi
527 };
528   my $docker_pid = fork();
529   if ($docker_pid == 0)
530   {
531     srun (["srun", "--nodelist=" . join(',', @node)],
532           ["/bin/sh", "-ec", $docker_install_script]);
533     exit ($?);
534   }
535   while (1)
536   {
537     last if $docker_pid == waitpid (-1, WNOHANG);
538     freeze_if_want_freeze ($docker_pid);
539     select (undef, undef, undef, 0.1);
540   }
541   if ($? != 0)
542   {
543     croak("Installing Docker image from $docker_locator returned exit code $?");
544   }
545 }
546
547 foreach (qw (script script_version script_parameters runtime_constraints))
548 {
549   Log (undef,
550        "$_ " .
551        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
552 }
553 foreach (split (/\n/, $Job->{knobs}))
554 {
555   Log (undef, "knob " . $_);
556 }
557
558
559
560 $main::success = undef;
561
562
563
564 ONELEVEL:
565
566 my $thisround_succeeded = 0;
567 my $thisround_failed = 0;
568 my $thisround_failed_multiple = 0;
569
570 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
571                        or $a <=> $b } @jobstep_todo;
572 my $level = $jobstep[$jobstep_todo[0]]->{level};
573 Log (undef, "start level $level");
574
575
576
577 my %proc;
578 my @freeslot = (0..$#slot);
579 my @holdslot;
580 my %reader;
581 my $progress_is_dirty = 1;
582 my $progress_stats_updated = 0;
583
584 update_progress_stats();
585
586
587
588 THISROUND:
589 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
590 {
591   my $id = $jobstep_todo[$todo_ptr];
592   my $Jobstep = $jobstep[$id];
593   if ($Jobstep->{level} != $level)
594   {
595     next;
596   }
597
598   pipe $reader{$id}, "writer" or croak ($!);
599   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
600   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
601
602   my $childslot = $freeslot[0];
603   my $childnode = $slot[$childslot]->{node};
604   my $childslotname = join (".",
605                             $slot[$childslot]->{node}->{name},
606                             $slot[$childslot]->{cpu});
607   my $childpid = fork();
608   if ($childpid == 0)
609   {
610     $SIG{'INT'} = 'DEFAULT';
611     $SIG{'QUIT'} = 'DEFAULT';
612     $SIG{'TERM'} = 'DEFAULT';
613
614     foreach (values (%reader))
615     {
616       close($_);
617     }
618     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
619     open(STDOUT,">&writer");
620     open(STDERR,">&writer");
621
622     undef $dbh;
623     undef $sth;
624
625     delete $ENV{"GNUPGHOME"};
626     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
627     $ENV{"TASK_QSEQUENCE"} = $id;
628     $ENV{"TASK_SEQUENCE"} = $level;
629     $ENV{"JOB_SCRIPT"} = $Job->{script};
630     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
631       $param =~ tr/a-z/A-Z/;
632       $ENV{"JOB_PARAMETER_$param"} = $value;
633     }
634     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
635     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
636     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
637     $ENV{"HOME"} = $ENV{"TASK_WORK"};
638     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
639     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
640     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
641     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
642
643     $ENV{"GZIP"} = "-n";
644
645     my @srunargs = (
646       "srun",
647       "--nodelist=".$childnode->{name},
648       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
649       "--job-name=$job_id.$id.$$",
650         );
651     my $build_script_to_send = "";
652     my $command =
653         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
654         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
655         ."&& cd $ENV{CRUNCH_TMP} ";
656     if ($build_script)
657     {
658       $build_script_to_send = $build_script;
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
672       $command .= "--env=\QHOME=/home/crunch\E ";
673       while (my ($env_key, $env_val) = each %ENV)
674       {
675         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
676           if ($env_key eq "TASK_KEEPMOUNT") {
677             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
678           }
679           else {
680             $command .= "--env=\Q$env_key=$env_val\E ";
681           }
682         }
683       }
684       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
685       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
686       $command .= "\Q$docker_hash\E ";
687       $command .= "stdbuf --output=0 --error=0 ";
688       $command .= "perl - ";
689       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
690     } else {
691       # Non-docker run
692       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "perl - ";
695       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
696     }
697
698     my @execargs = ('bash', '-c', $command);
699     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
700     # exec() failed, we assume nothing happened.
701     Log(undef, "srun() failed on build script");
702     die;
703   }
704   close("writer");
705   if (!defined $childpid)
706   {
707     close $reader{$id};
708     delete $reader{$id};
709     next;
710   }
711   shift @freeslot;
712   $proc{$childpid} = { jobstep => $id,
713                        time => time,
714                        slot => $childslot,
715                        jobstepname => "$job_id.$id.$childpid",
716                      };
717   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
718   $slot[$childslot]->{pid} = $childpid;
719
720   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
721   Log ($id, "child $childpid started on $childslotname");
722   $Jobstep->{starttime} = time;
723   $Jobstep->{node} = $childnode->{name};
724   $Jobstep->{slotindex} = $childslot;
725   delete $Jobstep->{stderr};
726   delete $Jobstep->{finishtime};
727
728   splice @jobstep_todo, $todo_ptr, 1;
729   --$todo_ptr;
730
731   $progress_is_dirty = 1;
732
733   while (!@freeslot
734          ||
735          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
736   {
737     last THISROUND if $main::please_freeze;
738     if ($main::please_info)
739     {
740       $main::please_info = 0;
741       freeze();
742       collate_output();
743       save_meta(1);
744       update_progress_stats();
745     }
746     my $gotsome
747         = readfrompipes ()
748         + reapchildren ();
749     if (!$gotsome)
750     {
751       check_refresh_wanted();
752       check_squeue();
753       update_progress_stats();
754       select (undef, undef, undef, 0.1);
755     }
756     elsif (time - $progress_stats_updated >= 30)
757     {
758       update_progress_stats();
759     }
760     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
761         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
762     {
763       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
764           .($thisround_failed+$thisround_succeeded)
765           .") -- giving up on this round";
766       Log (undef, $message);
767       last THISROUND;
768     }
769
770     # move slots from freeslot to holdslot (or back to freeslot) if necessary
771     for (my $i=$#freeslot; $i>=0; $i--) {
772       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
773         push @holdslot, (splice @freeslot, $i, 1);
774       }
775     }
776     for (my $i=$#holdslot; $i>=0; $i--) {
777       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
778         push @freeslot, (splice @holdslot, $i, 1);
779       }
780     }
781
782     # give up if no nodes are succeeding
783     if (!grep { $_->{node}->{losing_streak} == 0 &&
784                     $_->{node}->{hold_count} < 4 } @slot) {
785       my $message = "Every node has failed -- giving up on this round";
786       Log (undef, $message);
787       last THISROUND;
788     }
789   }
790 }
791
792
793 push @freeslot, splice @holdslot;
794 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
795
796
797 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
798 while (%proc)
799 {
800   if ($main::please_continue) {
801     $main::please_continue = 0;
802     goto THISROUND;
803   }
804   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
805   readfrompipes ();
806   if (!reapchildren())
807   {
808     check_refresh_wanted();
809     check_squeue();
810     update_progress_stats();
811     select (undef, undef, undef, 0.1);
812     killem (keys %proc) if $main::please_freeze;
813   }
814 }
815
816 update_progress_stats();
817 freeze_if_want_freeze();
818
819
820 if (!defined $main::success)
821 {
822   if (@jobstep_todo &&
823       $thisround_succeeded == 0 &&
824       ($thisround_failed == 0 || $thisround_failed > 4))
825   {
826     my $message = "stop because $thisround_failed tasks failed and none succeeded";
827     Log (undef, $message);
828     $main::success = 0;
829   }
830   if (!@jobstep_todo)
831   {
832     $main::success = 1;
833   }
834 }
835
836 goto ONELEVEL if !defined $main::success;
837
838
839 release_allocation();
840 freeze();
841 my $collated_output = &collate_output();
842
843 if ($job_has_uuid) {
844   $Job->update_attributes('running' => 0,
845                           'success' => $collated_output && $main::success,
846                           'finished_at' => scalar gmtime)
847 }
848
849 if (!$collated_output) {
850   Log(undef, "output undef");
851 }
852 else {
853   eval {
854     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
855         or die "failed to get collated manifest: $!";
856     my $orig_manifest_text = '';
857     while (my $manifest_line = <$orig_manifest>) {
858       $orig_manifest_text .= $manifest_line;
859     }
860     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
861       'manifest_text' => $orig_manifest_text,
862     });
863     Log(undef, "output " . $output->{portable_data_hash});
864     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
865   };
866   if ($@) {
867     Log (undef, "Failed to register output manifest: $@");
868   }
869 }
870
871 Log (undef, "finish");
872
873 save_meta();
874 exit ($Job->{'success'} ? 1 : 0);
875
876
877
878 sub update_progress_stats
879 {
880   $progress_stats_updated = time;
881   return if !$progress_is_dirty;
882   my ($todo, $done, $running) = (scalar @jobstep_todo,
883                                  scalar @jobstep_done,
884                                  scalar @slot - scalar @freeslot - scalar @holdslot);
885   $Job->{'tasks_summary'} ||= {};
886   $Job->{'tasks_summary'}->{'todo'} = $todo;
887   $Job->{'tasks_summary'}->{'done'} = $done;
888   $Job->{'tasks_summary'}->{'running'} = $running;
889   if ($job_has_uuid) {
890     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
891   }
892   Log (undef, "status: $done done, $running running, $todo todo");
893   $progress_is_dirty = 0;
894 }
895
896
897
898 sub reapchildren
899 {
900   my $pid = waitpid (-1, WNOHANG);
901   return 0 if $pid <= 0;
902
903   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
904                   . "."
905                   . $slot[$proc{$pid}->{slot}]->{cpu});
906   my $jobstepid = $proc{$pid}->{jobstep};
907   my $elapsed = time - $proc{$pid}->{time};
908   my $Jobstep = $jobstep[$jobstepid];
909
910   my $childstatus = $?;
911   my $exitvalue = $childstatus >> 8;
912   my $exitinfo = sprintf("exit %d signal %d%s",
913                          $exitvalue,
914                          $childstatus & 127,
915                          ($childstatus & 128 ? ' core dump' : ''));
916   $Jobstep->{'arvados_task'}->reload;
917   my $task_success = $Jobstep->{'arvados_task'}->{success};
918
919   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
920
921   if (!defined $task_success) {
922     # task did not indicate one way or the other --> fail
923     $Jobstep->{'arvados_task'}->{success} = 0;
924     $Jobstep->{'arvados_task'}->save;
925     $task_success = 0;
926   }
927
928   if (!$task_success)
929   {
930     my $temporary_fail;
931     $temporary_fail ||= $Jobstep->{node_fail};
932     $temporary_fail ||= ($exitvalue == 111);
933
934     ++$thisround_failed;
935     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
936
937     # Check for signs of a failed or misconfigured node
938     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
939         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
940       # Don't count this against jobstep failure thresholds if this
941       # node is already suspected faulty and srun exited quickly
942       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
943           $elapsed < 5) {
944         Log ($jobstepid, "blaming failure on suspect node " .
945              $slot[$proc{$pid}->{slot}]->{node}->{name});
946         $temporary_fail ||= 1;
947       }
948       ban_node_by_slot($proc{$pid}->{slot});
949     }
950
951     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
952                              ++$Jobstep->{'failures'},
953                              $temporary_fail ? 'temporary ' : 'permanent',
954                              $elapsed));
955
956     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
957       # Give up on this task, and the whole job
958       $main::success = 0;
959       $main::please_freeze = 1;
960     }
961     else {
962       # Put this task back on the todo queue
963       push @jobstep_todo, $jobstepid;
964     }
965     $Job->{'tasks_summary'}->{'failed'}++;
966   }
967   else
968   {
969     ++$thisround_succeeded;
970     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
971     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
972     push @jobstep_done, $jobstepid;
973     Log ($jobstepid, "success in $elapsed seconds");
974   }
975   $Jobstep->{exitcode} = $childstatus;
976   $Jobstep->{finishtime} = time;
977   process_stderr ($jobstepid, $task_success);
978   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
979
980   close $reader{$jobstepid};
981   delete $reader{$jobstepid};
982   delete $slot[$proc{$pid}->{slot}]->{pid};
983   push @freeslot, $proc{$pid}->{slot};
984   delete $proc{$pid};
985
986   if ($task_success) {
987     # Load new tasks
988     my $newtask_list = [];
989     my $newtask_results;
990     do {
991       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
992         'where' => {
993           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
994         },
995         'order' => 'qsequence',
996         'offset' => scalar(@$newtask_list),
997       );
998       push(@$newtask_list, @{$newtask_results->{items}});
999     } while (@{$newtask_results->{items}});
1000     foreach my $arvados_task (@$newtask_list) {
1001       my $jobstep = {
1002         'level' => $arvados_task->{'sequence'},
1003         'failures' => 0,
1004         'arvados_task' => $arvados_task
1005       };
1006       push @jobstep, $jobstep;
1007       push @jobstep_todo, $#jobstep;
1008     }
1009   }
1010
1011   $progress_is_dirty = 1;
1012   1;
1013 }
1014
1015 sub check_refresh_wanted
1016 {
1017   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1018   if (@stat && $stat[9] > $latest_refresh) {
1019     $latest_refresh = scalar time;
1020     if ($job_has_uuid) {
1021       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1022       for my $attr ('cancelled_at',
1023                     'cancelled_by_user_uuid',
1024                     'cancelled_by_client_uuid') {
1025         $Job->{$attr} = $Job2->{$attr};
1026       }
1027       if ($Job->{'cancelled_at'}) {
1028         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1029              " by user " . $Job->{cancelled_by_user_uuid});
1030         $main::success = 0;
1031         $main::please_freeze = 1;
1032       }
1033     }
1034   }
1035 }
1036
1037 sub check_squeue
1038 {
1039   # return if the kill list was checked <4 seconds ago
1040   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1041   {
1042     return;
1043   }
1044   $squeue_kill_checked = time;
1045
1046   # use killem() on procs whose killtime is reached
1047   for (keys %proc)
1048   {
1049     if (exists $proc{$_}->{killtime}
1050         && $proc{$_}->{killtime} <= time)
1051     {
1052       killem ($_);
1053     }
1054   }
1055
1056   # return if the squeue was checked <60 seconds ago
1057   if (defined $squeue_checked && $squeue_checked > time - 60)
1058   {
1059     return;
1060   }
1061   $squeue_checked = time;
1062
1063   if (!$have_slurm)
1064   {
1065     # here is an opportunity to check for mysterious problems with local procs
1066     return;
1067   }
1068
1069   # get a list of steps still running
1070   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1071   chop @squeue;
1072   if ($squeue[-1] ne "ok")
1073   {
1074     return;
1075   }
1076   pop @squeue;
1077
1078   # which of my jobsteps are running, according to squeue?
1079   my %ok;
1080   foreach (@squeue)
1081   {
1082     if (/^(\d+)\.(\d+) (\S+)/)
1083     {
1084       if ($1 eq $ENV{SLURM_JOBID})
1085       {
1086         $ok{$3} = 1;
1087       }
1088     }
1089   }
1090
1091   # which of my active child procs (>60s old) were not mentioned by squeue?
1092   foreach (keys %proc)
1093   {
1094     if ($proc{$_}->{time} < time - 60
1095         && !exists $ok{$proc{$_}->{jobstepname}}
1096         && !exists $proc{$_}->{killtime})
1097     {
1098       # kill this proc if it hasn't exited in 30 seconds
1099       $proc{$_}->{killtime} = time + 30;
1100     }
1101   }
1102 }
1103
1104
1105 sub release_allocation
1106 {
1107   if ($have_slurm)
1108   {
1109     Log (undef, "release job allocation");
1110     system "scancel $ENV{SLURM_JOBID}";
1111   }
1112 }
1113
1114
1115 sub readfrompipes
1116 {
1117   my $gotsome = 0;
1118   foreach my $job (keys %reader)
1119   {
1120     my $buf;
1121     while (0 < sysread ($reader{$job}, $buf, 8192))
1122     {
1123       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1124       $jobstep[$job]->{stderr} .= $buf;
1125       preprocess_stderr ($job);
1126       if (length ($jobstep[$job]->{stderr}) > 16384)
1127       {
1128         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1129       }
1130       $gotsome = 1;
1131     }
1132   }
1133   return $gotsome;
1134 }
1135
1136
1137 sub preprocess_stderr
1138 {
1139   my $job = shift;
1140
1141   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1142     my $line = $1;
1143     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1144     Log ($job, "stderr $line");
1145     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1146       # whoa.
1147       $main::please_freeze = 1;
1148     }
1149     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1150       $jobstep[$job]->{node_fail} = 1;
1151       ban_node_by_slot($jobstep[$job]->{slotindex});
1152     }
1153   }
1154 }
1155
1156
1157 sub process_stderr
1158 {
1159   my $job = shift;
1160   my $task_success = shift;
1161   preprocess_stderr ($job);
1162
1163   map {
1164     Log ($job, "stderr $_");
1165   } split ("\n", $jobstep[$job]->{stderr});
1166 }
1167
1168 sub fetch_block
1169 {
1170   my $hash = shift;
1171   my ($keep, $child_out, $output_block);
1172
1173   my $cmd = "arv-get \Q$hash\E";
1174   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1175   $output_block = '';
1176   while (1) {
1177     my $buf;
1178     my $bytes = sysread($keep, $buf, 1024 * 1024);
1179     if (!defined $bytes) {
1180       die "reading from arv-get: $!";
1181     } elsif ($bytes == 0) {
1182       # sysread returns 0 at the end of the pipe.
1183       last;
1184     } else {
1185       # some bytes were read into buf.
1186       $output_block .= $buf;
1187     }
1188   }
1189   close $keep;
1190   return $output_block;
1191 }
1192
1193 sub collate_output
1194 {
1195   Log (undef, "collate");
1196
1197   my ($child_out, $child_in);
1198   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1199   my $joboutput;
1200   for (@jobstep)
1201   {
1202     next if (!exists $_->{'arvados_task'}->{'output'} ||
1203              !$_->{'arvados_task'}->{'success'});
1204     my $output = $_->{'arvados_task'}->{output};
1205     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1206     {
1207       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1208       print $child_in $output;
1209     }
1210     elsif (@jobstep == 1)
1211     {
1212       $joboutput = $output;
1213       last;
1214     }
1215     elsif (defined (my $outblock = fetch_block ($output)))
1216     {
1217       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1218       print $child_in $outblock;
1219     }
1220     else
1221     {
1222       Log (undef, "XXX fetch_block($output) failed XXX");
1223       $main::success = 0;
1224     }
1225   }
1226   $child_in->close;
1227
1228   if (!defined $joboutput) {
1229     my $s = IO::Select->new($child_out);
1230     if ($s->can_read(120)) {
1231       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1232       chomp($joboutput);
1233     } else {
1234       Log (undef, "timed out reading from 'arv-put'");
1235     }
1236   }
1237   waitpid($pid, 0);
1238
1239   return $joboutput;
1240 }
1241
1242
1243 sub killem
1244 {
1245   foreach (@_)
1246   {
1247     my $sig = 2;                # SIGINT first
1248     if (exists $proc{$_}->{"sent_$sig"} &&
1249         time - $proc{$_}->{"sent_$sig"} > 4)
1250     {
1251       $sig = 15;                # SIGTERM if SIGINT doesn't work
1252     }
1253     if (exists $proc{$_}->{"sent_$sig"} &&
1254         time - $proc{$_}->{"sent_$sig"} > 4)
1255     {
1256       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1257     }
1258     if (!exists $proc{$_}->{"sent_$sig"})
1259     {
1260       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1261       kill $sig, $_;
1262       select (undef, undef, undef, 0.1);
1263       if ($sig == 2)
1264       {
1265         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1266       }
1267       $proc{$_}->{"sent_$sig"} = time;
1268       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1269     }
1270   }
1271 }
1272
1273
1274 sub fhbits
1275 {
1276   my($bits);
1277   for (@_) {
1278     vec($bits,fileno($_),1) = 1;
1279   }
1280   $bits;
1281 }
1282
1283
1284 sub Log                         # ($jobstep_id, $logmessage)
1285 {
1286   if ($_[1] =~ /\n/) {
1287     for my $line (split (/\n/, $_[1])) {
1288       Log ($_[0], $line);
1289     }
1290     return;
1291   }
1292   my $fh = select STDERR; $|=1; select $fh;
1293   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1294   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1295   $message .= "\n";
1296   my $datetime;
1297   if ($local_logfile || -t STDERR) {
1298     my @gmtime = gmtime;
1299     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1300                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1301   }
1302   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1303
1304   if ($local_logfile) {
1305     print $local_logfile $datetime . " " . $message;
1306   }
1307 }
1308
1309
1310 sub croak
1311 {
1312   my ($package, $file, $line) = caller;
1313   my $message = "@_ at $file line $line\n";
1314   Log (undef, $message);
1315   freeze() if @jobstep_todo;
1316   collate_output() if @jobstep_todo;
1317   cleanup();
1318   save_meta() if $local_logfile;
1319   die;
1320 }
1321
1322
1323 sub cleanup
1324 {
1325   return if !$job_has_uuid;
1326   $Job->update_attributes('running' => 0,
1327                           'success' => 0,
1328                           'finished_at' => scalar gmtime);
1329 }
1330
1331
1332 sub save_meta
1333 {
1334   my $justcheckpoint = shift; # false if this will be the last meta saved
1335   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1336
1337   $local_logfile->flush;
1338   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1339       . quotemeta($local_logfile->filename);
1340   my $loglocator = `$cmd`;
1341   die "system $cmd failed: $?" if $?;
1342   chomp($loglocator);
1343
1344   $local_logfile = undef;   # the temp file is automatically deleted
1345   Log (undef, "log manifest is $loglocator");
1346   $Job->{'log'} = $loglocator;
1347   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1348 }
1349
1350
1351 sub freeze_if_want_freeze
1352 {
1353   if ($main::please_freeze)
1354   {
1355     release_allocation();
1356     if (@_)
1357     {
1358       # kill some srun procs before freeze+stop
1359       map { $proc{$_} = {} } @_;
1360       while (%proc)
1361       {
1362         killem (keys %proc);
1363         select (undef, undef, undef, 0.1);
1364         my $died;
1365         while (($died = waitpid (-1, WNOHANG)) > 0)
1366         {
1367           delete $proc{$died};
1368         }
1369       }
1370     }
1371     freeze();
1372     collate_output();
1373     cleanup();
1374     save_meta();
1375     exit 0;
1376   }
1377 }
1378
1379
1380 sub freeze
1381 {
1382   Log (undef, "Freeze not implemented");
1383   return;
1384 }
1385
1386
1387 sub thaw
1388 {
1389   croak ("Thaw not implemented");
1390 }
1391
1392
1393 sub freezequote
1394 {
1395   my $s = shift;
1396   $s =~ s/\\/\\\\/g;
1397   $s =~ s/\n/\\n/g;
1398   return $s;
1399 }
1400
1401
1402 sub freezeunquote
1403 {
1404   my $s = shift;
1405   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1406   return $s;
1407 }
1408
1409
1410 sub srun
1411 {
1412   my $srunargs = shift;
1413   my $execargs = shift;
1414   my $opts = shift || {};
1415   my $stdin = shift;
1416   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1417   print STDERR (join (" ",
1418                       map { / / ? "'$_'" : $_ }
1419                       (@$args)),
1420                 "\n")
1421       if $ENV{CRUNCH_DEBUG};
1422
1423   if (defined $stdin) {
1424     my $child = open STDIN, "-|";
1425     defined $child or die "no fork: $!";
1426     if ($child == 0) {
1427       print $stdin or die $!;
1428       close STDOUT or die $!;
1429       exit 0;
1430     }
1431   }
1432
1433   return system (@$args) if $opts->{fork};
1434
1435   exec @$args;
1436   warn "ENV size is ".length(join(" ",%ENV));
1437   die "exec failed: $!: @$args";
1438 }
1439
1440
1441 sub ban_node_by_slot {
1442   # Don't start any new jobsteps on this node for 60 seconds
1443   my $slotid = shift;
1444   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1445   $slot[$slotid]->{node}->{hold_count}++;
1446   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1447 }
1448
1449 sub must_lock_now
1450 {
1451   my ($lockfile, $error_message) = @_;
1452   open L, ">", $lockfile or croak("$lockfile: $!");
1453   if (!flock L, LOCK_EX|LOCK_NB) {
1454     croak("Can't lock $lockfile: $error_message\n");
1455   }
1456 }
1457
1458 sub find_docker_image {
1459   # Given a Keep locator, check to see if it contains a Docker image.
1460   # If so, return its stream name and Docker hash.
1461   # If not, return undef for both values.
1462   my $locator = shift;
1463   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1464     my @file_list = @{$image->{files}};
1465     if ((scalar(@file_list) == 1) &&
1466         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1467       return ($file_list[0][0], $1);
1468     }
1469   }
1470   return (undef, undef);
1471 }
1472
1473 __DATA__
1474 #!/usr/bin/perl
1475
1476 # checkout-and-build
1477
1478 use Fcntl ':flock';
1479 use File::Path qw( make_path );
1480
1481 my $destdir = $ENV{"CRUNCH_SRC"};
1482 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1483 my $repo = $ENV{"CRUNCH_SRC_URL"};
1484 my $task_work = $ENV{"TASK_WORK"};
1485
1486 for my $dir ($destdir, $task_work) {
1487     if ($dir) {
1488         make_path $dir;
1489         -e $dir or die "Failed to create temporary directory ($dir): $!";
1490     }
1491 }
1492
1493 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1494 flock L, LOCK_EX;
1495 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1496     if (@ARGV) {
1497         exec(@ARGV);
1498         die "Cannot exec `@ARGV`: $!";
1499     } else {
1500         exit 0;
1501     }
1502 }
1503
1504 unlink "$destdir.commit";
1505 open STDOUT, ">", "$destdir.log";
1506 open STDERR, ">&STDOUT";
1507
1508 mkdir $destdir;
1509 my @git_archive_data = <DATA>;
1510 if (@git_archive_data) {
1511   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1512   print TARX @git_archive_data;
1513   if(!close(TARX)) {
1514     die "'tar -C $destdir -xf -' exited $?: $!";
1515   }
1516 }
1517
1518 my $pwd;
1519 chomp ($pwd = `pwd`);
1520 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1521 mkdir $install_dir;
1522
1523 for my $src_path ("$destdir/arvados/sdk/python") {
1524   if (-d $src_path) {
1525     shell_or_die ("virtualenv", $install_dir);
1526     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1527   }
1528 }
1529
1530 if (-e "$destdir/crunch_scripts/install") {
1531     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1532 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1533     # Old version
1534     shell_or_die ("./tests/autotests.sh", $install_dir);
1535 } elsif (-e "./install.sh") {
1536     shell_or_die ("./install.sh", $install_dir);
1537 }
1538
1539 if ($commit) {
1540     unlink "$destdir.commit.new";
1541     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1542     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1543 }
1544
1545 close L;
1546
1547 if (@ARGV) {
1548     exec(@ARGV);
1549     die "Cannot exec `@ARGV`: $!";
1550 } else {
1551     exit 0;
1552 }
1553
1554 sub shell_or_die
1555 {
1556   if ($ENV{"DEBUG"}) {
1557     print STDERR "@_\n";
1558   }
1559   system (@_) == 0
1560       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1561 }
1562
1563 __DATA__