Merge branch '3822-ensure-unique-names' refs #3822
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   # Note: this section is almost certainly unnecessary if we're
491   # running tasks in docker containers.
492   my $installpid = fork();
493   if ($installpid == 0)
494   {
495     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
496     exit (1);
497   }
498   while (1)
499   {
500     last if $installpid == waitpid (-1, WNOHANG);
501     freeze_if_want_freeze ($installpid);
502     select (undef, undef, undef, 0.1);
503   }
504   Log (undef, "Install exited $?");
505 }
506
507 if (!$have_slurm)
508 {
509   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
510   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
511 }
512
513 # If this job requires a Docker image, install that.
514 my $docker_bin = "/usr/bin/docker.io";
515 my ($docker_locator, $docker_stream, $docker_hash);
516 if ($docker_locator = $Job->{docker_image_locator}) {
517   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
518   if (!$docker_hash)
519   {
520     croak("No Docker image hash found from locator $docker_locator");
521   }
522   $docker_stream =~ s/^\.//;
523   my $docker_install_script = qq{
524 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
525     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
526 fi
527 };
528   my $docker_pid = fork();
529   if ($docker_pid == 0)
530   {
531     srun (["srun", "--nodelist=" . join(',', @node)],
532           ["/bin/sh", "-ec", $docker_install_script]);
533     exit ($?);
534   }
535   while (1)
536   {
537     last if $docker_pid == waitpid (-1, WNOHANG);
538     freeze_if_want_freeze ($docker_pid);
539     select (undef, undef, undef, 0.1);
540   }
541   if ($? != 0)
542   {
543     croak("Installing Docker image from $docker_locator returned exit code $?");
544   }
545 }
546
547 foreach (qw (script script_version script_parameters runtime_constraints))
548 {
549   Log (undef,
550        "$_ " .
551        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
552 }
553 foreach (split (/\n/, $Job->{knobs}))
554 {
555   Log (undef, "knob " . $_);
556 }
557
558
559
560 $main::success = undef;
561
562
563
564 ONELEVEL:
565
566 my $thisround_succeeded = 0;
567 my $thisround_failed = 0;
568 my $thisround_failed_multiple = 0;
569
570 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
571                        or $a <=> $b } @jobstep_todo;
572 my $level = $jobstep[$jobstep_todo[0]]->{level};
573 Log (undef, "start level $level");
574
575
576
577 my %proc;
578 my @freeslot = (0..$#slot);
579 my @holdslot;
580 my %reader;
581 my $progress_is_dirty = 1;
582 my $progress_stats_updated = 0;
583
584 update_progress_stats();
585
586
587
588 THISROUND:
589 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
590 {
591   my $id = $jobstep_todo[$todo_ptr];
592   my $Jobstep = $jobstep[$id];
593   if ($Jobstep->{level} != $level)
594   {
595     next;
596   }
597
598   pipe $reader{$id}, "writer" or croak ($!);
599   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
600   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
601
602   my $childslot = $freeslot[0];
603   my $childnode = $slot[$childslot]->{node};
604   my $childslotname = join (".",
605                             $slot[$childslot]->{node}->{name},
606                             $slot[$childslot]->{cpu});
607   my $childpid = fork();
608   if ($childpid == 0)
609   {
610     $SIG{'INT'} = 'DEFAULT';
611     $SIG{'QUIT'} = 'DEFAULT';
612     $SIG{'TERM'} = 'DEFAULT';
613
614     foreach (values (%reader))
615     {
616       close($_);
617     }
618     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
619     open(STDOUT,">&writer");
620     open(STDERR,">&writer");
621
622     undef $dbh;
623     undef $sth;
624
625     delete $ENV{"GNUPGHOME"};
626     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
627     $ENV{"TASK_QSEQUENCE"} = $id;
628     $ENV{"TASK_SEQUENCE"} = $level;
629     $ENV{"JOB_SCRIPT"} = $Job->{script};
630     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
631       $param =~ tr/a-z/A-Z/;
632       $ENV{"JOB_PARAMETER_$param"} = $value;
633     }
634     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
635     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
636     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
637     $ENV{"HOME"} = $ENV{"TASK_WORK"};
638     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
639     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
640     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
641     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
642
643     $ENV{"GZIP"} = "-n";
644
645     my @srunargs = (
646       "srun",
647       "--nodelist=".$childnode->{name},
648       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
649       "--job-name=$job_id.$id.$$",
650         );
651     my $build_script_to_send = "";
652     my $command =
653         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
654         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
655         ."&& cd $ENV{CRUNCH_TMP} ";
656     if ($build_script)
657     {
658       $build_script_to_send = $build_script;
659       $command .=
660           "&& perl -";
661     }
662     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
663     if ($docker_hash)
664     {
665       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
666       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
667       # Dynamically configure the container to use the host system as its
668       # DNS server.  Get the host's global addresses from the ip command,
669       # and turn them into docker --dns options using gawk.
670       $command .=
671           q{$(ip -o address show scope global |
672               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
673       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
674       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
675       $command .= "--env=\QHOME=/home/crunch\E ";
676       while (my ($env_key, $env_val) = each %ENV)
677       {
678         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
679           if ($env_key eq "TASK_WORK") {
680             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
681           }
682           elsif ($env_key eq "TASK_KEEPMOUNT") {
683             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
684           }
685           else {
686             $command .= "--env=\Q$env_key=$env_val\E ";
687           }
688         }
689       }
690       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
691       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
692       $command .= "\Q$docker_hash\E ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
695     } else {
696       # Non-docker run
697       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
698       $command .= "stdbuf --output=0 --error=0 ";
699       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
700     }
701
702     my @execargs = ('bash', '-c', $command);
703     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
704     # exec() failed, we assume nothing happened.
705     Log(undef, "srun() failed on build script");
706     die;
707   }
708   close("writer");
709   if (!defined $childpid)
710   {
711     close $reader{$id};
712     delete $reader{$id};
713     next;
714   }
715   shift @freeslot;
716   $proc{$childpid} = { jobstep => $id,
717                        time => time,
718                        slot => $childslot,
719                        jobstepname => "$job_id.$id.$childpid",
720                      };
721   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
722   $slot[$childslot]->{pid} = $childpid;
723
724   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
725   Log ($id, "child $childpid started on $childslotname");
726   $Jobstep->{starttime} = time;
727   $Jobstep->{node} = $childnode->{name};
728   $Jobstep->{slotindex} = $childslot;
729   delete $Jobstep->{stderr};
730   delete $Jobstep->{finishtime};
731
732   splice @jobstep_todo, $todo_ptr, 1;
733   --$todo_ptr;
734
735   $progress_is_dirty = 1;
736
737   while (!@freeslot
738          ||
739          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
740   {
741     last THISROUND if $main::please_freeze;
742     if ($main::please_info)
743     {
744       $main::please_info = 0;
745       freeze();
746       collate_output();
747       save_meta(1);
748       update_progress_stats();
749     }
750     my $gotsome
751         = readfrompipes ()
752         + reapchildren ();
753     if (!$gotsome)
754     {
755       check_refresh_wanted();
756       check_squeue();
757       update_progress_stats();
758       select (undef, undef, undef, 0.1);
759     }
760     elsif (time - $progress_stats_updated >= 30)
761     {
762       update_progress_stats();
763     }
764     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
765         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
766     {
767       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
768           .($thisround_failed+$thisround_succeeded)
769           .") -- giving up on this round";
770       Log (undef, $message);
771       last THISROUND;
772     }
773
774     # move slots from freeslot to holdslot (or back to freeslot) if necessary
775     for (my $i=$#freeslot; $i>=0; $i--) {
776       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
777         push @holdslot, (splice @freeslot, $i, 1);
778       }
779     }
780     for (my $i=$#holdslot; $i>=0; $i--) {
781       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
782         push @freeslot, (splice @holdslot, $i, 1);
783       }
784     }
785
786     # give up if no nodes are succeeding
787     if (!grep { $_->{node}->{losing_streak} == 0 &&
788                     $_->{node}->{hold_count} < 4 } @slot) {
789       my $message = "Every node has failed -- giving up on this round";
790       Log (undef, $message);
791       last THISROUND;
792     }
793   }
794 }
795
796
797 push @freeslot, splice @holdslot;
798 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
799
800
801 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
802 while (%proc)
803 {
804   if ($main::please_continue) {
805     $main::please_continue = 0;
806     goto THISROUND;
807   }
808   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
809   readfrompipes ();
810   if (!reapchildren())
811   {
812     check_refresh_wanted();
813     check_squeue();
814     update_progress_stats();
815     select (undef, undef, undef, 0.1);
816     killem (keys %proc) if $main::please_freeze;
817   }
818 }
819
820 update_progress_stats();
821 freeze_if_want_freeze();
822
823
824 if (!defined $main::success)
825 {
826   if (@jobstep_todo &&
827       $thisround_succeeded == 0 &&
828       ($thisround_failed == 0 || $thisround_failed > 4))
829   {
830     my $message = "stop because $thisround_failed tasks failed and none succeeded";
831     Log (undef, $message);
832     $main::success = 0;
833   }
834   if (!@jobstep_todo)
835   {
836     $main::success = 1;
837   }
838 }
839
840 goto ONELEVEL if !defined $main::success;
841
842
843 release_allocation();
844 freeze();
845 my $collated_output = &collate_output();
846
847 if ($job_has_uuid) {
848   $Job->update_attributes('running' => 0,
849                           'success' => $collated_output && $main::success,
850                           'finished_at' => scalar gmtime)
851 }
852
853 if (!$collated_output) {
854   Log(undef, "output undef");
855 }
856 else {
857   eval {
858     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
859         or die "failed to get collated manifest: $!";
860     my $orig_manifest_text = '';
861     while (my $manifest_line = <$orig_manifest>) {
862       $orig_manifest_text .= $manifest_line;
863     }
864     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
865       'manifest_text' => $orig_manifest_text,
866     });
867     Log(undef, "output uuid " . $output->{uuid});
868     Log(undef, "output hash " . $output->{portable_data_hash});
869     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
870   };
871   if ($@) {
872     Log (undef, "Failed to register output manifest: $@");
873   }
874 }
875
876 Log (undef, "finish");
877
878 save_meta();
879 exit ($Job->{'success'} ? 1 : 0);
880
881
882
883 sub update_progress_stats
884 {
885   $progress_stats_updated = time;
886   return if !$progress_is_dirty;
887   my ($todo, $done, $running) = (scalar @jobstep_todo,
888                                  scalar @jobstep_done,
889                                  scalar @slot - scalar @freeslot - scalar @holdslot);
890   $Job->{'tasks_summary'} ||= {};
891   $Job->{'tasks_summary'}->{'todo'} = $todo;
892   $Job->{'tasks_summary'}->{'done'} = $done;
893   $Job->{'tasks_summary'}->{'running'} = $running;
894   if ($job_has_uuid) {
895     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
896   }
897   Log (undef, "status: $done done, $running running, $todo todo");
898   $progress_is_dirty = 0;
899 }
900
901
902
903 sub reapchildren
904 {
905   my $pid = waitpid (-1, WNOHANG);
906   return 0 if $pid <= 0;
907
908   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
909                   . "."
910                   . $slot[$proc{$pid}->{slot}]->{cpu});
911   my $jobstepid = $proc{$pid}->{jobstep};
912   my $elapsed = time - $proc{$pid}->{time};
913   my $Jobstep = $jobstep[$jobstepid];
914
915   my $childstatus = $?;
916   my $exitvalue = $childstatus >> 8;
917   my $exitinfo = sprintf("exit %d signal %d%s",
918                          $exitvalue,
919                          $childstatus & 127,
920                          ($childstatus & 128 ? ' core dump' : ''));
921   $Jobstep->{'arvados_task'}->reload;
922   my $task_success = $Jobstep->{'arvados_task'}->{success};
923
924   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
925
926   if (!defined $task_success) {
927     # task did not indicate one way or the other --> fail
928     $Jobstep->{'arvados_task'}->{success} = 0;
929     $Jobstep->{'arvados_task'}->save;
930     $task_success = 0;
931   }
932
933   if (!$task_success)
934   {
935     my $temporary_fail;
936     $temporary_fail ||= $Jobstep->{node_fail};
937     $temporary_fail ||= ($exitvalue == 111);
938
939     ++$thisround_failed;
940     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
941
942     # Check for signs of a failed or misconfigured node
943     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
944         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
945       # Don't count this against jobstep failure thresholds if this
946       # node is already suspected faulty and srun exited quickly
947       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
948           $elapsed < 5) {
949         Log ($jobstepid, "blaming failure on suspect node " .
950              $slot[$proc{$pid}->{slot}]->{node}->{name});
951         $temporary_fail ||= 1;
952       }
953       ban_node_by_slot($proc{$pid}->{slot});
954     }
955
956     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
957                              ++$Jobstep->{'failures'},
958                              $temporary_fail ? 'temporary ' : 'permanent',
959                              $elapsed));
960
961     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
962       # Give up on this task, and the whole job
963       $main::success = 0;
964       $main::please_freeze = 1;
965     }
966     else {
967       # Put this task back on the todo queue
968       push @jobstep_todo, $jobstepid;
969     }
970     $Job->{'tasks_summary'}->{'failed'}++;
971   }
972   else
973   {
974     ++$thisround_succeeded;
975     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
976     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
977     push @jobstep_done, $jobstepid;
978     Log ($jobstepid, "success in $elapsed seconds");
979   }
980   $Jobstep->{exitcode} = $childstatus;
981   $Jobstep->{finishtime} = time;
982   process_stderr ($jobstepid, $task_success);
983   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
984
985   close $reader{$jobstepid};
986   delete $reader{$jobstepid};
987   delete $slot[$proc{$pid}->{slot}]->{pid};
988   push @freeslot, $proc{$pid}->{slot};
989   delete $proc{$pid};
990
991   if ($task_success) {
992     # Load new tasks
993     my $newtask_list = [];
994     my $newtask_results;
995     do {
996       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
997         'where' => {
998           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
999         },
1000         'order' => 'qsequence',
1001         'offset' => scalar(@$newtask_list),
1002       );
1003       push(@$newtask_list, @{$newtask_results->{items}});
1004     } while (@{$newtask_results->{items}});
1005     foreach my $arvados_task (@$newtask_list) {
1006       my $jobstep = {
1007         'level' => $arvados_task->{'sequence'},
1008         'failures' => 0,
1009         'arvados_task' => $arvados_task
1010       };
1011       push @jobstep, $jobstep;
1012       push @jobstep_todo, $#jobstep;
1013     }
1014   }
1015
1016   $progress_is_dirty = 1;
1017   1;
1018 }
1019
1020 sub check_refresh_wanted
1021 {
1022   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1023   if (@stat && $stat[9] > $latest_refresh) {
1024     $latest_refresh = scalar time;
1025     if ($job_has_uuid) {
1026       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1027       for my $attr ('cancelled_at',
1028                     'cancelled_by_user_uuid',
1029                     'cancelled_by_client_uuid') {
1030         $Job->{$attr} = $Job2->{$attr};
1031       }
1032       if ($Job->{'cancelled_at'}) {
1033         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1034              " by user " . $Job->{cancelled_by_user_uuid});
1035         $main::success = 0;
1036         $main::please_freeze = 1;
1037       }
1038     }
1039   }
1040 }
1041
1042 sub check_squeue
1043 {
1044   # return if the kill list was checked <4 seconds ago
1045   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1046   {
1047     return;
1048   }
1049   $squeue_kill_checked = time;
1050
1051   # use killem() on procs whose killtime is reached
1052   for (keys %proc)
1053   {
1054     if (exists $proc{$_}->{killtime}
1055         && $proc{$_}->{killtime} <= time)
1056     {
1057       killem ($_);
1058     }
1059   }
1060
1061   # return if the squeue was checked <60 seconds ago
1062   if (defined $squeue_checked && $squeue_checked > time - 60)
1063   {
1064     return;
1065   }
1066   $squeue_checked = time;
1067
1068   if (!$have_slurm)
1069   {
1070     # here is an opportunity to check for mysterious problems with local procs
1071     return;
1072   }
1073
1074   # get a list of steps still running
1075   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1076   chop @squeue;
1077   if ($squeue[-1] ne "ok")
1078   {
1079     return;
1080   }
1081   pop @squeue;
1082
1083   # which of my jobsteps are running, according to squeue?
1084   my %ok;
1085   foreach (@squeue)
1086   {
1087     if (/^(\d+)\.(\d+) (\S+)/)
1088     {
1089       if ($1 eq $ENV{SLURM_JOBID})
1090       {
1091         $ok{$3} = 1;
1092       }
1093     }
1094   }
1095
1096   # which of my active child procs (>60s old) were not mentioned by squeue?
1097   foreach (keys %proc)
1098   {
1099     if ($proc{$_}->{time} < time - 60
1100         && !exists $ok{$proc{$_}->{jobstepname}}
1101         && !exists $proc{$_}->{killtime})
1102     {
1103       # kill this proc if it hasn't exited in 30 seconds
1104       $proc{$_}->{killtime} = time + 30;
1105     }
1106   }
1107 }
1108
1109
1110 sub release_allocation
1111 {
1112   if ($have_slurm)
1113   {
1114     Log (undef, "release job allocation");
1115     system "scancel $ENV{SLURM_JOBID}";
1116   }
1117 }
1118
1119
1120 sub readfrompipes
1121 {
1122   my $gotsome = 0;
1123   foreach my $job (keys %reader)
1124   {
1125     my $buf;
1126     while (0 < sysread ($reader{$job}, $buf, 8192))
1127     {
1128       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1129       $jobstep[$job]->{stderr} .= $buf;
1130       preprocess_stderr ($job);
1131       if (length ($jobstep[$job]->{stderr}) > 16384)
1132       {
1133         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1134       }
1135       $gotsome = 1;
1136     }
1137   }
1138   return $gotsome;
1139 }
1140
1141
1142 sub preprocess_stderr
1143 {
1144   my $job = shift;
1145
1146   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1147     my $line = $1;
1148     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1149     Log ($job, "stderr $line");
1150     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1151       # whoa.
1152       $main::please_freeze = 1;
1153     }
1154     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1155       $jobstep[$job]->{node_fail} = 1;
1156       ban_node_by_slot($jobstep[$job]->{slotindex});
1157     }
1158   }
1159 }
1160
1161
1162 sub process_stderr
1163 {
1164   my $job = shift;
1165   my $task_success = shift;
1166   preprocess_stderr ($job);
1167
1168   map {
1169     Log ($job, "stderr $_");
1170   } split ("\n", $jobstep[$job]->{stderr});
1171 }
1172
1173 sub fetch_block
1174 {
1175   my $hash = shift;
1176   my ($keep, $child_out, $output_block);
1177
1178   my $cmd = "arv-get \Q$hash\E";
1179   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1180   $output_block = '';
1181   while (1) {
1182     my $buf;
1183     my $bytes = sysread($keep, $buf, 1024 * 1024);
1184     if (!defined $bytes) {
1185       die "reading from arv-get: $!";
1186     } elsif ($bytes == 0) {
1187       # sysread returns 0 at the end of the pipe.
1188       last;
1189     } else {
1190       # some bytes were read into buf.
1191       $output_block .= $buf;
1192     }
1193   }
1194   close $keep;
1195   return $output_block;
1196 }
1197
1198 sub collate_output
1199 {
1200   Log (undef, "collate");
1201
1202   my ($child_out, $child_in);
1203   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1204   my $joboutput;
1205   for (@jobstep)
1206   {
1207     next if (!exists $_->{'arvados_task'}->{'output'} ||
1208              !$_->{'arvados_task'}->{'success'});
1209     my $output = $_->{'arvados_task'}->{output};
1210     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1211     {
1212       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1213       print $child_in $output;
1214     }
1215     elsif (@jobstep == 1)
1216     {
1217       $joboutput = $output;
1218       last;
1219     }
1220     elsif (defined (my $outblock = fetch_block ($output)))
1221     {
1222       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1223       print $child_in $outblock;
1224     }
1225     else
1226     {
1227       Log (undef, "XXX fetch_block($output) failed XXX");
1228       $main::success = 0;
1229     }
1230   }
1231   $child_in->close;
1232
1233   if (!defined $joboutput) {
1234     my $s = IO::Select->new($child_out);
1235     if ($s->can_read(120)) {
1236       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1237       chomp($joboutput);
1238     } else {
1239       Log (undef, "timed out reading from 'arv-put'");
1240     }
1241   }
1242   waitpid($pid, 0);
1243
1244   return $joboutput;
1245 }
1246
1247
1248 sub killem
1249 {
1250   foreach (@_)
1251   {
1252     my $sig = 2;                # SIGINT first
1253     if (exists $proc{$_}->{"sent_$sig"} &&
1254         time - $proc{$_}->{"sent_$sig"} > 4)
1255     {
1256       $sig = 15;                # SIGTERM if SIGINT doesn't work
1257     }
1258     if (exists $proc{$_}->{"sent_$sig"} &&
1259         time - $proc{$_}->{"sent_$sig"} > 4)
1260     {
1261       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1262     }
1263     if (!exists $proc{$_}->{"sent_$sig"})
1264     {
1265       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1266       kill $sig, $_;
1267       select (undef, undef, undef, 0.1);
1268       if ($sig == 2)
1269       {
1270         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1271       }
1272       $proc{$_}->{"sent_$sig"} = time;
1273       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1274     }
1275   }
1276 }
1277
1278
1279 sub fhbits
1280 {
1281   my($bits);
1282   for (@_) {
1283     vec($bits,fileno($_),1) = 1;
1284   }
1285   $bits;
1286 }
1287
1288
1289 sub Log                         # ($jobstep_id, $logmessage)
1290 {
1291   if ($_[1] =~ /\n/) {
1292     for my $line (split (/\n/, $_[1])) {
1293       Log ($_[0], $line);
1294     }
1295     return;
1296   }
1297   my $fh = select STDERR; $|=1; select $fh;
1298   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1299   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1300   $message .= "\n";
1301   my $datetime;
1302   if ($local_logfile || -t STDERR) {
1303     my @gmtime = gmtime;
1304     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1305                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1306   }
1307   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1308
1309   if ($local_logfile) {
1310     print $local_logfile $datetime . " " . $message;
1311   }
1312 }
1313
1314
1315 sub croak
1316 {
1317   my ($package, $file, $line) = caller;
1318   my $message = "@_ at $file line $line\n";
1319   Log (undef, $message);
1320   freeze() if @jobstep_todo;
1321   collate_output() if @jobstep_todo;
1322   cleanup();
1323   save_meta() if $local_logfile;
1324   die;
1325 }
1326
1327
1328 sub cleanup
1329 {
1330   return if !$job_has_uuid;
1331   $Job->update_attributes('running' => 0,
1332                           'success' => 0,
1333                           'finished_at' => scalar gmtime);
1334 }
1335
1336
1337 sub save_meta
1338 {
1339   my $justcheckpoint = shift; # false if this will be the last meta saved
1340   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1341
1342   $local_logfile->flush;
1343   my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
1344       . quotemeta($local_logfile->filename);
1345   my $loglocator = `$cmd`;
1346   die "system $cmd failed: $?" if $?;
1347   chomp($loglocator);
1348
1349   $local_logfile = undef;   # the temp file is automatically deleted
1350   Log (undef, "log manifest is $loglocator");
1351   $Job->{'log'} = $loglocator;
1352   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1353 }
1354
1355
1356 sub freeze_if_want_freeze
1357 {
1358   if ($main::please_freeze)
1359   {
1360     release_allocation();
1361     if (@_)
1362     {
1363       # kill some srun procs before freeze+stop
1364       map { $proc{$_} = {} } @_;
1365       while (%proc)
1366       {
1367         killem (keys %proc);
1368         select (undef, undef, undef, 0.1);
1369         my $died;
1370         while (($died = waitpid (-1, WNOHANG)) > 0)
1371         {
1372           delete $proc{$died};
1373         }
1374       }
1375     }
1376     freeze();
1377     collate_output();
1378     cleanup();
1379     save_meta();
1380     exit 0;
1381   }
1382 }
1383
1384
1385 sub freeze
1386 {
1387   Log (undef, "Freeze not implemented");
1388   return;
1389 }
1390
1391
1392 sub thaw
1393 {
1394   croak ("Thaw not implemented");
1395 }
1396
1397
1398 sub freezequote
1399 {
1400   my $s = shift;
1401   $s =~ s/\\/\\\\/g;
1402   $s =~ s/\n/\\n/g;
1403   return $s;
1404 }
1405
1406
1407 sub freezeunquote
1408 {
1409   my $s = shift;
1410   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1411   return $s;
1412 }
1413
1414
1415 sub srun
1416 {
1417   my $srunargs = shift;
1418   my $execargs = shift;
1419   my $opts = shift || {};
1420   my $stdin = shift;
1421   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1422   print STDERR (join (" ",
1423                       map { / / ? "'$_'" : $_ }
1424                       (@$args)),
1425                 "\n")
1426       if $ENV{CRUNCH_DEBUG};
1427
1428   if (defined $stdin) {
1429     my $child = open STDIN, "-|";
1430     defined $child or die "no fork: $!";
1431     if ($child == 0) {
1432       print $stdin or die $!;
1433       close STDOUT or die $!;
1434       exit 0;
1435     }
1436   }
1437
1438   return system (@$args) if $opts->{fork};
1439
1440   exec @$args;
1441   warn "ENV size is ".length(join(" ",%ENV));
1442   die "exec failed: $!: @$args";
1443 }
1444
1445
1446 sub ban_node_by_slot {
1447   # Don't start any new jobsteps on this node for 60 seconds
1448   my $slotid = shift;
1449   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1450   $slot[$slotid]->{node}->{hold_count}++;
1451   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1452 }
1453
1454 sub must_lock_now
1455 {
1456   my ($lockfile, $error_message) = @_;
1457   open L, ">", $lockfile or croak("$lockfile: $!");
1458   if (!flock L, LOCK_EX|LOCK_NB) {
1459     croak("Can't lock $lockfile: $error_message\n");
1460   }
1461 }
1462
1463 sub find_docker_image {
1464   # Given a Keep locator, check to see if it contains a Docker image.
1465   # If so, return its stream name and Docker hash.
1466   # If not, return undef for both values.
1467   my $locator = shift;
1468   my ($streamname, $filename);
1469   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1470     foreach my $line (split(/\n/, $image->{manifest_text})) {
1471       my @tokens = split(/\s+/, $line);
1472       next if (!@tokens);
1473       $streamname = shift(@tokens);
1474       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1475         if (defined($filename)) {
1476           return (undef, undef);  # More than one file in the Collection.
1477         } else {
1478           $filename = (split(/:/, $filedata, 3))[2];
1479         }
1480       }
1481     }
1482   }
1483   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1484     return ($streamname, $1);
1485   } else {
1486     return (undef, undef);
1487   }
1488 }
1489
1490 __DATA__
1491 #!/usr/bin/perl
1492
1493 # checkout-and-build
1494
1495 use Fcntl ':flock';
1496 use File::Path qw( make_path );
1497
1498 my $destdir = $ENV{"CRUNCH_SRC"};
1499 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1500 my $repo = $ENV{"CRUNCH_SRC_URL"};
1501 my $task_work = $ENV{"TASK_WORK"};
1502
1503 for my $dir ($destdir, $task_work) {
1504     if ($dir) {
1505         make_path $dir;
1506         -e $dir or die "Failed to create temporary directory ($dir): $!";
1507     }
1508 }
1509
1510 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1511 flock L, LOCK_EX;
1512 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1513     if (@ARGV) {
1514         exec(@ARGV);
1515         die "Cannot exec `@ARGV`: $!";
1516     } else {
1517         exit 0;
1518     }
1519 }
1520
1521 unlink "$destdir.commit";
1522 open STDOUT, ">", "$destdir.log";
1523 open STDERR, ">&STDOUT";
1524
1525 mkdir $destdir;
1526 my @git_archive_data = <DATA>;
1527 if (@git_archive_data) {
1528   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1529   print TARX @git_archive_data;
1530   if(!close(TARX)) {
1531     die "'tar -C $destdir -xf -' exited $?: $!";
1532   }
1533 }
1534
1535 my $pwd;
1536 chomp ($pwd = `pwd`);
1537 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1538 mkdir $install_dir;
1539
1540 for my $src_path ("$destdir/arvados/sdk/python") {
1541   if (-d $src_path) {
1542     shell_or_die ("virtualenv", $install_dir);
1543     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1544   }
1545 }
1546
1547 if (-e "$destdir/crunch_scripts/install") {
1548     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1549 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1550     # Old version
1551     shell_or_die ("./tests/autotests.sh", $install_dir);
1552 } elsif (-e "./install.sh") {
1553     shell_or_die ("./install.sh", $install_dir);
1554 }
1555
1556 if ($commit) {
1557     unlink "$destdir.commit.new";
1558     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1559     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1560 }
1561
1562 close L;
1563
1564 if (@ARGV) {
1565     exec(@ARGV);
1566     die "Cannot exec `@ARGV`: $!";
1567 } else {
1568     exit 0;
1569 }
1570
1571 sub shell_or_die
1572 {
1573   if ($ENV{"DEBUG"}) {
1574     print STDERR "@_\n";
1575   }
1576   system (@_) == 0
1577       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1578 }
1579
1580 __DATA__