3782: Merge branch 'master' into 3782-stub-io-pipe
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
130
131
132 $SIG{'USR1'} = sub
133 {
134   $main::ENV{CRUNCH_DEBUG} = 1;
135 };
136 $SIG{'USR2'} = sub
137 {
138   $main::ENV{CRUNCH_DEBUG} = 0;
139 };
140
141
142
143 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $local_logfile;
145
146 my $User = $arv->{'users'}->{'current'}->execute;
147
148 my $Job = {};
149 my $job_id;
150 my $dbh;
151 my $sth;
152 if ($job_has_uuid)
153 {
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # Claim this job, and make sure nobody else does
157     eval {
158       # lock() sets is_locked_by_uuid and changes state to Running.
159       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
160     };
161     if ($@) {
162       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
163       exit EX_TEMPFAIL;
164     };
165   }
166 }
167 else
168 {
169   $Job = JSON::decode_json($jobspec);
170
171   if (!$resume_stash)
172   {
173     map { croak ("No $_ specified") unless $Job->{$_} }
174     qw(script script_version script_parameters);
175   }
176
177   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178   $Job->{'started_at'} = gmtime;
179
180   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
181
182   $job_has_uuid = 1;
183 }
184 $job_id = $Job->{'uuid'};
185
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
188
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
192
193
194 Log (undef, "check slurm allocation");
195 my @slot;
196 my @node;
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
198 my @sinfo;
199 if (!$have_slurm)
200 {
201   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202   push @sinfo, "$localcpus localhost";
203 }
204 if (exists $ENV{SLURM_NODELIST})
205 {
206   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
207 }
208 foreach (@sinfo)
209 {
210   my ($ncpus, $slurm_nodelist) = split;
211   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
212
213   my @nodelist;
214   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
215   {
216     my $nodelist = $1;
217     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
218     {
219       my $ranges = $1;
220       foreach (split (",", $ranges))
221       {
222         my ($a, $b);
223         if (/(\d+)-(\d+)/)
224         {
225           $a = $1;
226           $b = $2;
227         }
228         else
229         {
230           $a = $_;
231           $b = $_;
232         }
233         push @nodelist, map {
234           my $n = $nodelist;
235           $n =~ s/\[[-,\d]+\]/$_/;
236           $n;
237         } ($a..$b);
238       }
239     }
240     else
241     {
242       push @nodelist, $nodelist;
243     }
244   }
245   foreach my $nodename (@nodelist)
246   {
247     Log (undef, "node $nodename - $ncpus slots");
248     my $node = { name => $nodename,
249                  ncpus => $ncpus,
250                  losing_streak => 0,
251                  hold_until => 0 };
252     foreach my $cpu (1..$ncpus)
253     {
254       push @slot, { node => $node,
255                     cpu => $cpu };
256     }
257   }
258   push @node, @nodelist;
259 }
260
261
262
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
265
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
267
268
269 $Job->update_attributes(
270   'tasks_summary' => { 'failed' => 0,
271                        'todo' => 1,
272                        'running' => 0,
273                        'done' => 0 });
274
275 Log (undef, "start");
276 $SIG{'INT'} = sub { $main::please_freeze = 1; };
277 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
278 $SIG{'TERM'} = \&croak;
279 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
280 $SIG{'ALRM'} = sub { $main::please_info = 1; };
281 $SIG{'CONT'} = sub { $main::please_continue = 1; };
282 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
283
284 $main::please_freeze = 0;
285 $main::please_info = 0;
286 $main::please_continue = 0;
287 $main::please_refresh = 0;
288 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
289
290 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
291 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
292 $ENV{"JOB_UUID"} = $job_id;
293
294
295 my @jobstep;
296 my @jobstep_todo = ();
297 my @jobstep_done = ();
298 my @jobstep_tomerge = ();
299 my $jobstep_tomerge_level = 0;
300 my $squeue_checked;
301 my $squeue_kill_checked;
302 my $output_in_keep = 0;
303 my $latest_refresh = scalar time;
304
305
306
307 if (defined $Job->{thawedfromkey})
308 {
309   thaw ($Job->{thawedfromkey});
310 }
311 else
312 {
313   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
314     'job_uuid' => $Job->{'uuid'},
315     'sequence' => 0,
316     'qsequence' => 0,
317     'parameters' => {},
318                                                           });
319   push @jobstep, { 'level' => 0,
320                    'failures' => 0,
321                    'arvados_task' => $first_task,
322                  };
323   push @jobstep_todo, 0;
324 }
325
326
327 if (!$have_slurm)
328 {
329   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
330 }
331
332
333 my $build_script;
334
335
336 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
337
338 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
339 if ($skip_install)
340 {
341   if (!defined $no_clear_tmp) {
342     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
343     system($clear_tmp_cmd) == 0
344         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
345   }
346   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
347   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
348     if (-d $src_path) {
349       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
350           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
351       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
352           == 0
353           or croak ("setup.py in $src_path failed: exit ".($?>>8));
354     }
355   }
356 }
357 else
358 {
359   do {
360     local $/ = undef;
361     $build_script = <DATA>;
362   };
363   Log (undef, "Install revision ".$Job->{script_version});
364   my $nodelist = join(",", @node);
365
366   if (!defined $no_clear_tmp) {
367     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
368
369     my $cleanpid = fork();
370     if ($cleanpid == 0)
371     {
372       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
373             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
374       exit (1);
375     }
376     while (1)
377     {
378       last if $cleanpid == waitpid (-1, WNOHANG);
379       freeze_if_want_freeze ($cleanpid);
380       select (undef, undef, undef, 0.1);
381     }
382     Log (undef, "Clean-work-dir exited $?");
383   }
384
385   # Install requested code version
386
387   my @execargs;
388   my @srunargs = ("srun",
389                   "--nodelist=$nodelist",
390                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
391
392   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
393   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
394
395   my $commit;
396   my $git_archive;
397   my $treeish = $Job->{'script_version'};
398
399   # If we're running under crunch-dispatch, it will have pulled the
400   # appropriate source tree into its own repository, and given us that
401   # repo's path as $git_dir. If we're running a "local" job, and a
402   # script_version was specified, it's up to the user to provide the
403   # full path to a local repository in Job->{repository}.
404   #
405   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
406   # git-archive --remote where appropriate.
407   #
408   # TODO: Accept a locally-hosted Arvados repository by name or
409   # UUID. Use arvados.v1.repositories.list or .get to figure out the
410   # appropriate fetch-url.
411   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
412
413   $ENV{"CRUNCH_SRC_URL"} = $repo;
414
415   if (-d "$repo/.git") {
416     # We were given a working directory, but we are only interested in
417     # the index.
418     $repo = "$repo/.git";
419   }
420
421   # If this looks like a subversion r#, look for it in git-svn commit messages
422
423   if ($treeish =~ m{^\d{1,4}$}) {
424     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
425     chomp $gitlog;
426     Log(undef, "git Subversion search exited $?");
427     if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
428       $commit = $gitlog;
429       Log(undef, "Using commit $commit for Subversion revision $treeish");
430     }
431   }
432
433   # If that didn't work, try asking git to look it up as a tree-ish.
434
435   if (!defined $commit) {
436     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
437     chomp $found;
438     Log(undef, "git rev-list exited $? with result '$found'");
439     if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
440       $commit = $found;
441       Log(undef, "Using commit $commit for tree-ish $treeish");
442       if ($commit ne $treeish) {
443         # Make sure we record the real commit id in the database,
444         # frozentokey, logs, etc. -- instead of an abbreviation or a
445         # branch name which can become ambiguous or point to a
446         # different commit in the future.
447         $Job->{'script_version'} = $commit;
448         !$job_has_uuid or
449             $Job->update_attributes('script_version' => $commit) or
450             croak("Error while updating job");
451       }
452     }
453   }
454
455   if (defined $commit) {
456     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
457     @execargs = ("sh", "-c",
458                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
459     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
460     croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
461   }
462   else {
463     croak ("could not figure out commit id for $treeish");
464   }
465
466   # Note: this section is almost certainly unnecessary if we're
467   # running tasks in docker containers.
468   my $installpid = fork();
469   if ($installpid == 0)
470   {
471     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
472     exit (1);
473   }
474   while (1)
475   {
476     last if $installpid == waitpid (-1, WNOHANG);
477     freeze_if_want_freeze ($installpid);
478     select (undef, undef, undef, 0.1);
479   }
480   Log (undef, "Install exited $?");
481 }
482
483 if (!$have_slurm)
484 {
485   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
486   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
487 }
488
489 # If this job requires a Docker image, install that.
490 my $docker_bin = "/usr/bin/docker.io";
491 my ($docker_locator, $docker_stream, $docker_hash);
492 if ($docker_locator = $Job->{docker_image_locator}) {
493   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
494   if (!$docker_hash)
495   {
496     croak("No Docker image hash found from locator $docker_locator");
497   }
498   $docker_stream =~ s/^\.//;
499   my $docker_install_script = qq{
500 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
501     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
502 fi
503 };
504   my $docker_pid = fork();
505   if ($docker_pid == 0)
506   {
507     srun (["srun", "--nodelist=" . join(',', @node)],
508           ["/bin/sh", "-ec", $docker_install_script]);
509     exit ($?);
510   }
511   while (1)
512   {
513     last if $docker_pid == waitpid (-1, WNOHANG);
514     freeze_if_want_freeze ($docker_pid);
515     select (undef, undef, undef, 0.1);
516   }
517   if ($? != 0)
518   {
519     croak("Installing Docker image from $docker_locator returned exit code $?");
520   }
521 }
522
523 foreach (qw (script script_version script_parameters runtime_constraints))
524 {
525   Log (undef,
526        "$_ " .
527        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
528 }
529 foreach (split (/\n/, $Job->{knobs}))
530 {
531   Log (undef, "knob " . $_);
532 }
533
534
535
536 $main::success = undef;
537
538
539
540 ONELEVEL:
541
542 my $thisround_succeeded = 0;
543 my $thisround_failed = 0;
544 my $thisround_failed_multiple = 0;
545
546 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
547                        or $a <=> $b } @jobstep_todo;
548 my $level = $jobstep[$jobstep_todo[0]]->{level};
549 Log (undef, "start level $level");
550
551
552
553 my %proc;
554 my @freeslot = (0..$#slot);
555 my @holdslot;
556 my %reader;
557 my $progress_is_dirty = 1;
558 my $progress_stats_updated = 0;
559
560 update_progress_stats();
561
562
563
564 THISROUND:
565 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
566 {
567   my $id = $jobstep_todo[$todo_ptr];
568   my $Jobstep = $jobstep[$id];
569   if ($Jobstep->{level} != $level)
570   {
571     next;
572   }
573
574   pipe $reader{$id}, "writer" or croak ($!);
575   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
576   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
577
578   my $childslot = $freeslot[0];
579   my $childnode = $slot[$childslot]->{node};
580   my $childslotname = join (".",
581                             $slot[$childslot]->{node}->{name},
582                             $slot[$childslot]->{cpu});
583   my $childpid = fork();
584   if ($childpid == 0)
585   {
586     $SIG{'INT'} = 'DEFAULT';
587     $SIG{'QUIT'} = 'DEFAULT';
588     $SIG{'TERM'} = 'DEFAULT';
589
590     foreach (values (%reader))
591     {
592       close($_);
593     }
594     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
595     open(STDOUT,">&writer");
596     open(STDERR,">&writer");
597
598     undef $dbh;
599     undef $sth;
600
601     delete $ENV{"GNUPGHOME"};
602     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
603     $ENV{"TASK_QSEQUENCE"} = $id;
604     $ENV{"TASK_SEQUENCE"} = $level;
605     $ENV{"JOB_SCRIPT"} = $Job->{script};
606     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
607       $param =~ tr/a-z/A-Z/;
608       $ENV{"JOB_PARAMETER_$param"} = $value;
609     }
610     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
611     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
612     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
613     $ENV{"HOME"} = $ENV{"TASK_WORK"};
614     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
615     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
616     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
617     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
618
619     $ENV{"GZIP"} = "-n";
620
621     my @srunargs = (
622       "srun",
623       "--nodelist=".$childnode->{name},
624       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
625       "--job-name=$job_id.$id.$$",
626         );
627     my $build_script_to_send = "";
628     my $command =
629         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
630         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
631         ."&& cd $ENV{CRUNCH_TMP} ";
632     if ($build_script)
633     {
634       $build_script_to_send = $build_script;
635       $command .=
636           "&& perl -";
637     }
638     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
639     if ($docker_hash)
640     {
641       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
642       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
643       # Dynamically configure the container to use the host system as its
644       # DNS server.  Get the host's global addresses from the ip command,
645       # and turn them into docker --dns options using gawk.
646       $command .=
647           q{$(ip -o address show scope global |
648               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
649       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
650       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
651       $command .= "--env=\QHOME=/home/crunch\E ";
652       while (my ($env_key, $env_val) = each %ENV)
653       {
654         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
655           if ($env_key eq "TASK_WORK") {
656             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
657           }
658           elsif ($env_key eq "TASK_KEEPMOUNT") {
659             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
660           }
661           else {
662             $command .= "--env=\Q$env_key=$env_val\E ";
663           }
664         }
665       }
666       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
667       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
668       $command .= "\Q$docker_hash\E ";
669       $command .= "stdbuf --output=0 --error=0 ";
670       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
671     } else {
672       # Non-docker run
673       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
674       $command .= "stdbuf --output=0 --error=0 ";
675       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
676     }
677
678     my @execargs = ('bash', '-c', $command);
679     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
680     # exec() failed, we assume nothing happened.
681     Log(undef, "srun() failed on build script");
682     die;
683   }
684   close("writer");
685   if (!defined $childpid)
686   {
687     close $reader{$id};
688     delete $reader{$id};
689     next;
690   }
691   shift @freeslot;
692   $proc{$childpid} = { jobstep => $id,
693                        time => time,
694                        slot => $childslot,
695                        jobstepname => "$job_id.$id.$childpid",
696                      };
697   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
698   $slot[$childslot]->{pid} = $childpid;
699
700   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
701   Log ($id, "child $childpid started on $childslotname");
702   $Jobstep->{starttime} = time;
703   $Jobstep->{node} = $childnode->{name};
704   $Jobstep->{slotindex} = $childslot;
705   delete $Jobstep->{stderr};
706   delete $Jobstep->{finishtime};
707
708   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
709   $Jobstep->{'arvados_task'}->save;
710
711   splice @jobstep_todo, $todo_ptr, 1;
712   --$todo_ptr;
713
714   $progress_is_dirty = 1;
715
716   while (!@freeslot
717          ||
718          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
719   {
720     last THISROUND if $main::please_freeze;
721     if ($main::please_info)
722     {
723       $main::please_info = 0;
724       freeze();
725       collate_output();
726       save_meta(1);
727       update_progress_stats();
728     }
729     my $gotsome
730         = readfrompipes ()
731         + reapchildren ();
732     if (!$gotsome)
733     {
734       check_refresh_wanted();
735       check_squeue();
736       update_progress_stats();
737       select (undef, undef, undef, 0.1);
738     }
739     elsif (time - $progress_stats_updated >= 30)
740     {
741       update_progress_stats();
742     }
743     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
744         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
745     {
746       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
747           .($thisround_failed+$thisround_succeeded)
748           .") -- giving up on this round";
749       Log (undef, $message);
750       last THISROUND;
751     }
752
753     # move slots from freeslot to holdslot (or back to freeslot) if necessary
754     for (my $i=$#freeslot; $i>=0; $i--) {
755       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
756         push @holdslot, (splice @freeslot, $i, 1);
757       }
758     }
759     for (my $i=$#holdslot; $i>=0; $i--) {
760       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
761         push @freeslot, (splice @holdslot, $i, 1);
762       }
763     }
764
765     # give up if no nodes are succeeding
766     if (!grep { $_->{node}->{losing_streak} == 0 &&
767                     $_->{node}->{hold_count} < 4 } @slot) {
768       my $message = "Every node has failed -- giving up on this round";
769       Log (undef, $message);
770       last THISROUND;
771     }
772   }
773 }
774
775
776 push @freeslot, splice @holdslot;
777 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
778
779
780 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
781 while (%proc)
782 {
783   if ($main::please_continue) {
784     $main::please_continue = 0;
785     goto THISROUND;
786   }
787   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
788   readfrompipes ();
789   if (!reapchildren())
790   {
791     check_refresh_wanted();
792     check_squeue();
793     update_progress_stats();
794     select (undef, undef, undef, 0.1);
795     killem (keys %proc) if $main::please_freeze;
796   }
797 }
798
799 update_progress_stats();
800 freeze_if_want_freeze();
801
802
803 if (!defined $main::success)
804 {
805   if (@jobstep_todo &&
806       $thisround_succeeded == 0 &&
807       ($thisround_failed == 0 || $thisround_failed > 4))
808   {
809     my $message = "stop because $thisround_failed tasks failed and none succeeded";
810     Log (undef, $message);
811     $main::success = 0;
812   }
813   if (!@jobstep_todo)
814   {
815     $main::success = 1;
816   }
817 }
818
819 goto ONELEVEL if !defined $main::success;
820
821
822 release_allocation();
823 freeze();
824 my $collated_output = &collate_output();
825
826 if (!$collated_output) {
827   Log(undef, "output undef");
828 }
829 else {
830   eval {
831     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
832         or die "failed to get collated manifest: $!";
833     my $orig_manifest_text = '';
834     while (my $manifest_line = <$orig_manifest>) {
835       $orig_manifest_text .= $manifest_line;
836     }
837     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
838       'manifest_text' => $orig_manifest_text,
839     });
840     Log(undef, "output uuid " . $output->{uuid});
841     Log(undef, "output hash " . $output->{portable_data_hash});
842     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
843   };
844   if ($@) {
845     Log (undef, "Failed to register output manifest: $@");
846   }
847 }
848
849 Log (undef, "finish");
850
851 save_meta();
852
853 if ($job_has_uuid) {
854   if ($collated_output && $main::success) {
855     $Job->update_attributes('state' => 'Complete')
856   } else {
857     $Job->update_attributes('state' => 'Failed')
858   }
859 }
860
861 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
862
863
864
865 sub update_progress_stats
866 {
867   $progress_stats_updated = time;
868   return if !$progress_is_dirty;
869   my ($todo, $done, $running) = (scalar @jobstep_todo,
870                                  scalar @jobstep_done,
871                                  scalar @slot - scalar @freeslot - scalar @holdslot);
872   $Job->{'tasks_summary'} ||= {};
873   $Job->{'tasks_summary'}->{'todo'} = $todo;
874   $Job->{'tasks_summary'}->{'done'} = $done;
875   $Job->{'tasks_summary'}->{'running'} = $running;
876   if ($job_has_uuid) {
877     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
878   }
879   Log (undef, "status: $done done, $running running, $todo todo");
880   $progress_is_dirty = 0;
881 }
882
883
884
885 sub reapchildren
886 {
887   my $pid = waitpid (-1, WNOHANG);
888   return 0 if $pid <= 0;
889
890   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
891                   . "."
892                   . $slot[$proc{$pid}->{slot}]->{cpu});
893   my $jobstepid = $proc{$pid}->{jobstep};
894   my $elapsed = time - $proc{$pid}->{time};
895   my $Jobstep = $jobstep[$jobstepid];
896
897   my $childstatus = $?;
898   my $exitvalue = $childstatus >> 8;
899   my $exitinfo = sprintf("exit %d signal %d%s",
900                          $exitvalue,
901                          $childstatus & 127,
902                          ($childstatus & 128 ? ' core dump' : ''));
903   $Jobstep->{'arvados_task'}->reload;
904   my $task_success = $Jobstep->{'arvados_task'}->{success};
905
906   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
907
908   if (!defined $task_success) {
909     # task did not indicate one way or the other --> fail
910     $Jobstep->{'arvados_task'}->{success} = 0;
911     $Jobstep->{'arvados_task'}->save;
912     $task_success = 0;
913   }
914
915   if (!$task_success)
916   {
917     my $temporary_fail;
918     $temporary_fail ||= $Jobstep->{node_fail};
919     $temporary_fail ||= ($exitvalue == 111);
920
921     ++$thisround_failed;
922     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
923
924     # Check for signs of a failed or misconfigured node
925     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
926         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
927       # Don't count this against jobstep failure thresholds if this
928       # node is already suspected faulty and srun exited quickly
929       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
930           $elapsed < 5) {
931         Log ($jobstepid, "blaming failure on suspect node " .
932              $slot[$proc{$pid}->{slot}]->{node}->{name});
933         $temporary_fail ||= 1;
934       }
935       ban_node_by_slot($proc{$pid}->{slot});
936     }
937
938     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
939                              ++$Jobstep->{'failures'},
940                              $temporary_fail ? 'temporary ' : 'permanent',
941                              $elapsed));
942
943     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
944       # Give up on this task, and the whole job
945       $main::success = 0;
946       $main::please_freeze = 1;
947     }
948     else {
949       # Put this task back on the todo queue
950       push @jobstep_todo, $jobstepid;
951     }
952     $Job->{'tasks_summary'}->{'failed'}++;
953   }
954   else
955   {
956     ++$thisround_succeeded;
957     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
958     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
959     push @jobstep_done, $jobstepid;
960     Log ($jobstepid, "success in $elapsed seconds");
961   }
962   $Jobstep->{exitcode} = $childstatus;
963   $Jobstep->{finishtime} = time;
964   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
965   $Jobstep->{'arvados_task'}->save;
966   process_stderr ($jobstepid, $task_success);
967   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
968
969   close $reader{$jobstepid};
970   delete $reader{$jobstepid};
971   delete $slot[$proc{$pid}->{slot}]->{pid};
972   push @freeslot, $proc{$pid}->{slot};
973   delete $proc{$pid};
974
975   if ($task_success) {
976     # Load new tasks
977     my $newtask_list = [];
978     my $newtask_results;
979     do {
980       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
981         'where' => {
982           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
983         },
984         'order' => 'qsequence',
985         'offset' => scalar(@$newtask_list),
986       );
987       push(@$newtask_list, @{$newtask_results->{items}});
988     } while (@{$newtask_results->{items}});
989     foreach my $arvados_task (@$newtask_list) {
990       my $jobstep = {
991         'level' => $arvados_task->{'sequence'},
992         'failures' => 0,
993         'arvados_task' => $arvados_task
994       };
995       push @jobstep, $jobstep;
996       push @jobstep_todo, $#jobstep;
997     }
998   }
999
1000   $progress_is_dirty = 1;
1001   1;
1002 }
1003
1004 sub check_refresh_wanted
1005 {
1006   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1007   if (@stat && $stat[9] > $latest_refresh) {
1008     $latest_refresh = scalar time;
1009     if ($job_has_uuid) {
1010       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1011       for my $attr ('cancelled_at',
1012                     'cancelled_by_user_uuid',
1013                     'cancelled_by_client_uuid',
1014                     'state') {
1015         $Job->{$attr} = $Job2->{$attr};
1016       }
1017       if ($Job->{'state'} ne "Running") {
1018         if ($Job->{'state'} eq "Cancelled") {
1019           Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1020         } else {
1021           Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1022         }
1023         $main::success = 0;
1024         $main::please_freeze = 1;
1025       }
1026     }
1027   }
1028 }
1029
1030 sub check_squeue
1031 {
1032   # return if the kill list was checked <4 seconds ago
1033   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1034   {
1035     return;
1036   }
1037   $squeue_kill_checked = time;
1038
1039   # use killem() on procs whose killtime is reached
1040   for (keys %proc)
1041   {
1042     if (exists $proc{$_}->{killtime}
1043         && $proc{$_}->{killtime} <= time)
1044     {
1045       killem ($_);
1046     }
1047   }
1048
1049   # return if the squeue was checked <60 seconds ago
1050   if (defined $squeue_checked && $squeue_checked > time - 60)
1051   {
1052     return;
1053   }
1054   $squeue_checked = time;
1055
1056   if (!$have_slurm)
1057   {
1058     # here is an opportunity to check for mysterious problems with local procs
1059     return;
1060   }
1061
1062   # get a list of steps still running
1063   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1064   chop @squeue;
1065   if ($squeue[-1] ne "ok")
1066   {
1067     return;
1068   }
1069   pop @squeue;
1070
1071   # which of my jobsteps are running, according to squeue?
1072   my %ok;
1073   foreach (@squeue)
1074   {
1075     if (/^(\d+)\.(\d+) (\S+)/)
1076     {
1077       if ($1 eq $ENV{SLURM_JOBID})
1078       {
1079         $ok{$3} = 1;
1080       }
1081     }
1082   }
1083
1084   # which of my active child procs (>60s old) were not mentioned by squeue?
1085   foreach (keys %proc)
1086   {
1087     if ($proc{$_}->{time} < time - 60
1088         && !exists $ok{$proc{$_}->{jobstepname}}
1089         && !exists $proc{$_}->{killtime})
1090     {
1091       # kill this proc if it hasn't exited in 30 seconds
1092       $proc{$_}->{killtime} = time + 30;
1093     }
1094   }
1095 }
1096
1097
1098 sub release_allocation
1099 {
1100   if ($have_slurm)
1101   {
1102     Log (undef, "release job allocation");
1103     system "scancel $ENV{SLURM_JOBID}";
1104   }
1105 }
1106
1107
1108 sub readfrompipes
1109 {
1110   my $gotsome = 0;
1111   foreach my $job (keys %reader)
1112   {
1113     my $buf;
1114     while (0 < sysread ($reader{$job}, $buf, 8192))
1115     {
1116       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1117       $jobstep[$job]->{stderr} .= $buf;
1118       preprocess_stderr ($job);
1119       if (length ($jobstep[$job]->{stderr}) > 16384)
1120       {
1121         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1122       }
1123       $gotsome = 1;
1124     }
1125   }
1126   return $gotsome;
1127 }
1128
1129
1130 sub preprocess_stderr
1131 {
1132   my $job = shift;
1133
1134   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1135     my $line = $1;
1136     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1137     Log ($job, "stderr $line");
1138     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1139       # whoa.
1140       $main::please_freeze = 1;
1141     }
1142     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1143       $jobstep[$job]->{node_fail} = 1;
1144       ban_node_by_slot($jobstep[$job]->{slotindex});
1145     }
1146   }
1147 }
1148
1149
1150 sub process_stderr
1151 {
1152   my $job = shift;
1153   my $task_success = shift;
1154   preprocess_stderr ($job);
1155
1156   map {
1157     Log ($job, "stderr $_");
1158   } split ("\n", $jobstep[$job]->{stderr});
1159 }
1160
1161 sub fetch_block
1162 {
1163   my $hash = shift;
1164   my ($keep, $child_out, $output_block);
1165
1166   my $cmd = "arv-get \Q$hash\E";
1167   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1168   $output_block = '';
1169   while (1) {
1170     my $buf;
1171     my $bytes = sysread($keep, $buf, 1024 * 1024);
1172     if (!defined $bytes) {
1173       die "reading from arv-get: $!";
1174     } elsif ($bytes == 0) {
1175       # sysread returns 0 at the end of the pipe.
1176       last;
1177     } else {
1178       # some bytes were read into buf.
1179       $output_block .= $buf;
1180     }
1181   }
1182   close $keep;
1183   return $output_block;
1184 }
1185
1186 sub collate_output
1187 {
1188   Log (undef, "collate");
1189
1190   my ($child_out, $child_in);
1191   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1192                   '--retries', put_retry_count());
1193   my $joboutput;
1194   for (@jobstep)
1195   {
1196     next if (!exists $_->{'arvados_task'}->{'output'} ||
1197              !$_->{'arvados_task'}->{'success'});
1198     my $output = $_->{'arvados_task'}->{output};
1199     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1200     {
1201       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1202       print $child_in $output;
1203     }
1204     elsif (@jobstep == 1)
1205     {
1206       $joboutput = $output;
1207       last;
1208     }
1209     elsif (defined (my $outblock = fetch_block ($output)))
1210     {
1211       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1212       print $child_in $outblock;
1213     }
1214     else
1215     {
1216       Log (undef, "XXX fetch_block($output) failed XXX");
1217       $main::success = 0;
1218     }
1219   }
1220   $child_in->close;
1221
1222   if (!defined $joboutput) {
1223     my $s = IO::Select->new($child_out);
1224     if ($s->can_read(120)) {
1225       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1226       chomp($joboutput);
1227     } else {
1228       Log (undef, "timed out reading from 'arv-put'");
1229     }
1230   }
1231   waitpid($pid, 0);
1232
1233   return $joboutput;
1234 }
1235
1236
1237 sub killem
1238 {
1239   foreach (@_)
1240   {
1241     my $sig = 2;                # SIGINT first
1242     if (exists $proc{$_}->{"sent_$sig"} &&
1243         time - $proc{$_}->{"sent_$sig"} > 4)
1244     {
1245       $sig = 15;                # SIGTERM if SIGINT doesn't work
1246     }
1247     if (exists $proc{$_}->{"sent_$sig"} &&
1248         time - $proc{$_}->{"sent_$sig"} > 4)
1249     {
1250       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1251     }
1252     if (!exists $proc{$_}->{"sent_$sig"})
1253     {
1254       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1255       kill $sig, $_;
1256       select (undef, undef, undef, 0.1);
1257       if ($sig == 2)
1258       {
1259         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1260       }
1261       $proc{$_}->{"sent_$sig"} = time;
1262       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1263     }
1264   }
1265 }
1266
1267
1268 sub fhbits
1269 {
1270   my($bits);
1271   for (@_) {
1272     vec($bits,fileno($_),1) = 1;
1273   }
1274   $bits;
1275 }
1276
1277
1278 sub Log                         # ($jobstep_id, $logmessage)
1279 {
1280   if ($_[1] =~ /\n/) {
1281     for my $line (split (/\n/, $_[1])) {
1282       Log ($_[0], $line);
1283     }
1284     return;
1285   }
1286   my $fh = select STDERR; $|=1; select $fh;
1287   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1288   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1289   $message .= "\n";
1290   my $datetime;
1291   if ($local_logfile || -t STDERR) {
1292     my @gmtime = gmtime;
1293     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1294                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1295   }
1296   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1297
1298   if ($local_logfile) {
1299     print $local_logfile $datetime . " " . $message;
1300   }
1301 }
1302
1303
1304 sub croak
1305 {
1306   my ($package, $file, $line) = caller;
1307   my $message = "@_ at $file line $line\n";
1308   Log (undef, $message);
1309   freeze() if @jobstep_todo;
1310   collate_output() if @jobstep_todo;
1311   cleanup();
1312   save_meta() if $local_logfile;
1313   die;
1314 }
1315
1316
1317 sub cleanup
1318 {
1319   return if !$job_has_uuid;
1320   if ($Job->{'state'} eq 'Cancelled') {
1321     $Job->update_attributes('finished_at' => scalar gmtime);
1322   } else {
1323     $Job->update_attributes('state' => 'Failed');
1324   }
1325 }
1326
1327
1328 sub save_meta
1329 {
1330   my $justcheckpoint = shift; # false if this will be the last meta saved
1331   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1332
1333   $local_logfile->flush;
1334   my $retry_count = put_retry_count();
1335   my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1336       "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1337   my $loglocator = `$cmd`;
1338   die "system $cmd failed: $?" if $?;
1339   chomp($loglocator);
1340
1341   $local_logfile = undef;   # the temp file is automatically deleted
1342   Log (undef, "log manifest is $loglocator");
1343   $Job->{'log'} = $loglocator;
1344   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1345 }
1346
1347
1348 sub freeze_if_want_freeze
1349 {
1350   if ($main::please_freeze)
1351   {
1352     release_allocation();
1353     if (@_)
1354     {
1355       # kill some srun procs before freeze+stop
1356       map { $proc{$_} = {} } @_;
1357       while (%proc)
1358       {
1359         killem (keys %proc);
1360         select (undef, undef, undef, 0.1);
1361         my $died;
1362         while (($died = waitpid (-1, WNOHANG)) > 0)
1363         {
1364           delete $proc{$died};
1365         }
1366       }
1367     }
1368     freeze();
1369     collate_output();
1370     cleanup();
1371     save_meta();
1372     exit 0;
1373   }
1374 }
1375
1376
1377 sub freeze
1378 {
1379   Log (undef, "Freeze not implemented");
1380   return;
1381 }
1382
1383
1384 sub thaw
1385 {
1386   croak ("Thaw not implemented");
1387 }
1388
1389
1390 sub freezequote
1391 {
1392   my $s = shift;
1393   $s =~ s/\\/\\\\/g;
1394   $s =~ s/\n/\\n/g;
1395   return $s;
1396 }
1397
1398
1399 sub freezeunquote
1400 {
1401   my $s = shift;
1402   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1403   return $s;
1404 }
1405
1406
1407 sub srun
1408 {
1409   my $srunargs = shift;
1410   my $execargs = shift;
1411   my $opts = shift || {};
1412   my $stdin = shift;
1413   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1414   print STDERR (join (" ",
1415                       map { / / ? "'$_'" : $_ }
1416                       (@$args)),
1417                 "\n")
1418       if $ENV{CRUNCH_DEBUG};
1419
1420   if (defined $stdin) {
1421     my $child = open STDIN, "-|";
1422     defined $child or die "no fork: $!";
1423     if ($child == 0) {
1424       print $stdin or die $!;
1425       close STDOUT or die $!;
1426       exit 0;
1427     }
1428   }
1429
1430   return system (@$args) if $opts->{fork};
1431
1432   exec @$args;
1433   warn "ENV size is ".length(join(" ",%ENV));
1434   die "exec failed: $!: @$args";
1435 }
1436
1437
1438 sub ban_node_by_slot {
1439   # Don't start any new jobsteps on this node for 60 seconds
1440   my $slotid = shift;
1441   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1442   $slot[$slotid]->{node}->{hold_count}++;
1443   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1444 }
1445
1446 sub must_lock_now
1447 {
1448   my ($lockfile, $error_message) = @_;
1449   open L, ">", $lockfile or croak("$lockfile: $!");
1450   if (!flock L, LOCK_EX|LOCK_NB) {
1451     croak("Can't lock $lockfile: $error_message\n");
1452   }
1453 }
1454
1455 sub find_docker_image {
1456   # Given a Keep locator, check to see if it contains a Docker image.
1457   # If so, return its stream name and Docker hash.
1458   # If not, return undef for both values.
1459   my $locator = shift;
1460   my ($streamname, $filename);
1461   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1462     foreach my $line (split(/\n/, $image->{manifest_text})) {
1463       my @tokens = split(/\s+/, $line);
1464       next if (!@tokens);
1465       $streamname = shift(@tokens);
1466       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1467         if (defined($filename)) {
1468           return (undef, undef);  # More than one file in the Collection.
1469         } else {
1470           $filename = (split(/:/, $filedata, 3))[2];
1471         }
1472       }
1473     }
1474   }
1475   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1476     return ($streamname, $1);
1477   } else {
1478     return (undef, undef);
1479   }
1480 }
1481
1482 sub put_retry_count {
1483   # Calculate a --retries argument for arv-put that will have it try
1484   # approximately as long as this Job has been running.
1485   my $stoptime = shift || time;
1486   my $starttime = $jobstep[0]->{starttime};
1487   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1488   my $retries = 0;
1489   while ($timediff >= 2) {
1490     $retries++;
1491     $timediff /= 2;
1492   }
1493   return ($retries > 3) ? $retries : 3;
1494 }
1495
1496 __DATA__
1497 #!/usr/bin/perl
1498
1499 # checkout-and-build
1500
1501 use Fcntl ':flock';
1502 use File::Path qw( make_path );
1503
1504 my $destdir = $ENV{"CRUNCH_SRC"};
1505 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1506 my $repo = $ENV{"CRUNCH_SRC_URL"};
1507 my $task_work = $ENV{"TASK_WORK"};
1508
1509 for my $dir ($destdir, $task_work) {
1510     if ($dir) {
1511         make_path $dir;
1512         -e $dir or die "Failed to create temporary directory ($dir): $!";
1513     }
1514 }
1515
1516 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1517 flock L, LOCK_EX;
1518 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1519     if (@ARGV) {
1520         exec(@ARGV);
1521         die "Cannot exec `@ARGV`: $!";
1522     } else {
1523         exit 0;
1524     }
1525 }
1526
1527 unlink "$destdir.commit";
1528 open STDOUT, ">", "$destdir.log";
1529 open STDERR, ">&STDOUT";
1530
1531 mkdir $destdir;
1532 my @git_archive_data = <DATA>;
1533 if (@git_archive_data) {
1534   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1535   print TARX @git_archive_data;
1536   if(!close(TARX)) {
1537     die "'tar -C $destdir -xf -' exited $?: $!";
1538   }
1539 }
1540
1541 my $pwd;
1542 chomp ($pwd = `pwd`);
1543 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1544 mkdir $install_dir;
1545
1546 for my $src_path ("$destdir/arvados/sdk/python") {
1547   if (-d $src_path) {
1548     shell_or_die ("virtualenv", $install_dir);
1549     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1550   }
1551 }
1552
1553 if (-e "$destdir/crunch_scripts/install") {
1554     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1555 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1556     # Old version
1557     shell_or_die ("./tests/autotests.sh", $install_dir);
1558 } elsif (-e "./install.sh") {
1559     shell_or_die ("./install.sh", $install_dir);
1560 }
1561
1562 if ($commit) {
1563     unlink "$destdir.commit.new";
1564     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1565     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1566 }
1567
1568 close L;
1569
1570 if (@ARGV) {
1571     exec(@ARGV);
1572     die "Cannot exec `@ARGV`: $!";
1573 } else {
1574     exit 0;
1575 }
1576
1577 sub shell_or_die
1578 {
1579   if ($ENV{"DEBUG"}) {
1580     print STDERR "@_\n";
1581   }
1582   system (@_) == 0
1583       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1584 }
1585
1586 __DATA__