462b78c5f8a8f056e0acd21a4bc25c1651ae0fda
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90   if ($ENV{"USER"} ne "crunch" && $< != 0) {
91     # use a tmp dir unique for my uid
92     $ENV{"CRUNCH_TMP"} .= "-$<";
93   }
94 }
95
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
99 }
100
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
105
106 my $force_unlock;
107 my $git_dir;
108 my $jobspec;
109 my $job_api_token;
110 my $no_clear_tmp;
111 my $resume_stash;
112 GetOptions('force-unlock' => \$force_unlock,
113            'git-dir=s' => \$git_dir,
114            'job=s' => \$jobspec,
115            'job-api-token=s' => \$job_api_token,
116            'no-clear-tmp' => \$no_clear_tmp,
117            'resume-stash=s' => \$resume_stash,
118     );
119
120 if (defined $job_api_token) {
121   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
122 }
123
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
127
128
129 $SIG{'USR1'} = sub
130 {
131   $main::ENV{CRUNCH_DEBUG} = 1;
132 };
133 $SIG{'USR2'} = sub
134 {
135   $main::ENV{CRUNCH_DEBUG} = 0;
136 };
137
138
139
140 my $arv = Arvados->new('apiVersion' => 'v1');
141 my $local_logfile;
142
143 my $User = $arv->{'users'}->{'current'}->execute;
144
145 my $Job = {};
146 my $job_id;
147 my $dbh;
148 my $sth;
149 if ($job_has_uuid)
150 {
151   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152   if (!$force_unlock) {
153     if ($Job->{'is_locked_by_uuid'}) {
154       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
155     }
156     if ($Job->{'success'} ne undef) {
157       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
158     }
159     if ($Job->{'running'}) {
160       croak("Job 'running' flag is already set");
161     }
162     if ($Job->{'started_at'}) {
163       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
164     }
165   }
166 }
167 else
168 {
169   $Job = JSON::decode_json($jobspec);
170
171   if (!$resume_stash)
172   {
173     map { croak ("No $_ specified") unless $Job->{$_} }
174     qw(script script_version script_parameters);
175   }
176
177   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178   $Job->{'started_at'} = gmtime;
179
180   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
181
182   $job_has_uuid = 1;
183 }
184 $job_id = $Job->{'uuid'};
185
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
188
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
192
193
194 Log (undef, "check slurm allocation");
195 my @slot;
196 my @node;
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
198 my @sinfo;
199 if (!$have_slurm)
200 {
201   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202   push @sinfo, "$localcpus localhost";
203 }
204 if (exists $ENV{SLURM_NODELIST})
205 {
206   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
207 }
208 foreach (@sinfo)
209 {
210   my ($ncpus, $slurm_nodelist) = split;
211   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
212
213   my @nodelist;
214   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
215   {
216     my $nodelist = $1;
217     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
218     {
219       my $ranges = $1;
220       foreach (split (",", $ranges))
221       {
222         my ($a, $b);
223         if (/(\d+)-(\d+)/)
224         {
225           $a = $1;
226           $b = $2;
227         }
228         else
229         {
230           $a = $_;
231           $b = $_;
232         }
233         push @nodelist, map {
234           my $n = $nodelist;
235           $n =~ s/\[[-,\d]+\]/$_/;
236           $n;
237         } ($a..$b);
238       }
239     }
240     else
241     {
242       push @nodelist, $nodelist;
243     }
244   }
245   foreach my $nodename (@nodelist)
246   {
247     Log (undef, "node $nodename - $ncpus slots");
248     my $node = { name => $nodename,
249                  ncpus => $ncpus,
250                  losing_streak => 0,
251                  hold_until => 0 };
252     foreach my $cpu (1..$ncpus)
253     {
254       push @slot, { node => $node,
255                     cpu => $cpu };
256     }
257   }
258   push @node, @nodelist;
259 }
260
261
262
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
265
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
267
268
269
270 my $jobmanager_id;
271 if ($job_has_uuid)
272 {
273   # Claim this job, and make sure nobody else does
274   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
275           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
276     croak("Error while updating / locking job");
277   }
278   $Job->update_attributes('started_at' => scalar gmtime,
279                           'running' => 1,
280                           'success' => undef,
281                           'tasks_summary' => { 'failed' => 0,
282                                                'todo' => 1,
283                                                'running' => 0,
284                                                'done' => 0 });
285 }
286
287
288 Log (undef, "start");
289 $SIG{'INT'} = sub { $main::please_freeze = 1; };
290 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
291 $SIG{'TERM'} = \&croak;
292 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
293 $SIG{'ALRM'} = sub { $main::please_info = 1; };
294 $SIG{'CONT'} = sub { $main::please_continue = 1; };
295 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
296
297 $main::please_freeze = 0;
298 $main::please_info = 0;
299 $main::please_continue = 0;
300 $main::please_refresh = 0;
301 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
302
303 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
304 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
305 $ENV{"JOB_UUID"} = $job_id;
306
307
308 my @jobstep;
309 my @jobstep_todo = ();
310 my @jobstep_done = ();
311 my @jobstep_tomerge = ();
312 my $jobstep_tomerge_level = 0;
313 my $squeue_checked;
314 my $squeue_kill_checked;
315 my $output_in_keep = 0;
316 my $latest_refresh = scalar time;
317
318
319
320 if (defined $Job->{thawedfromkey})
321 {
322   thaw ($Job->{thawedfromkey});
323 }
324 else
325 {
326   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
327     'job_uuid' => $Job->{'uuid'},
328     'sequence' => 0,
329     'qsequence' => 0,
330     'parameters' => {},
331                                                           });
332   push @jobstep, { 'level' => 0,
333                    'failures' => 0,
334                    'arvados_task' => $first_task,
335                  };
336   push @jobstep_todo, 0;
337 }
338
339
340 if (!$have_slurm)
341 {
342   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
343 }
344
345
346 my $build_script;
347
348
349 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
350
351 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
352 if ($skip_install)
353 {
354   if (!defined $no_clear_tmp) {
355     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
356     system($clear_tmp_cmd) == 0
357         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
358   }
359   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
360   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
361     if (-d $src_path) {
362       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
363           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
364       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
365           == 0
366           or croak ("setup.py in $src_path failed: exit ".($?>>8));
367     }
368   }
369 }
370 else
371 {
372   do {
373     local $/ = undef;
374     $build_script = <DATA>;
375   };
376   Log (undef, "Install revision ".$Job->{script_version});
377   my $nodelist = join(",", @node);
378
379   if (!defined $no_clear_tmp) {
380     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
381
382     my $cleanpid = fork();
383     if ($cleanpid == 0)
384     {
385       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
386             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
387       exit (1);
388     }
389     while (1)
390     {
391       last if $cleanpid == waitpid (-1, WNOHANG);
392       freeze_if_want_freeze ($cleanpid);
393       select (undef, undef, undef, 0.1);
394     }
395     Log (undef, "Clean-work-dir exited $?");
396   }
397
398   # Install requested code version
399
400   my @execargs;
401   my @srunargs = ("srun",
402                   "--nodelist=$nodelist",
403                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
404
405   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
406   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
407
408   my $commit;
409   my $git_archive;
410   my $treeish = $Job->{'script_version'};
411
412   # If we're running under crunch-dispatch, it will have pulled the
413   # appropriate source tree into its own repository, and given us that
414   # repo's path as $git_dir. If we're running a "local" job, and a
415   # script_version was specified, it's up to the user to provide the
416   # full path to a local repository in Job->{repository}.
417   #
418   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
419   # git-archive --remote where appropriate.
420   #
421   # TODO: Accept a locally-hosted Arvados repository by name or
422   # UUID. Use arvados.v1.repositories.list or .get to figure out the
423   # appropriate fetch-url.
424   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
425
426   $ENV{"CRUNCH_SRC_URL"} = $repo;
427
428   if (-d "$repo/.git") {
429     # We were given a working directory, but we are only interested in
430     # the index.
431     $repo = "$repo/.git";
432   }
433
434   # If this looks like a subversion r#, look for it in git-svn commit messages
435
436   if ($treeish =~ m{^\d{1,4}$}) {
437     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
438     chomp $gitlog;
439     if ($gitlog =~ /^[a-f0-9]{40}$/) {
440       $commit = $gitlog;
441       Log (undef, "Using commit $commit for script_version $treeish");
442     }
443   }
444
445   # If that didn't work, try asking git to look it up as a tree-ish.
446
447   if (!defined $commit) {
448     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
449     chomp $found;
450     if ($found =~ /^[0-9a-f]{40}$/s) {
451       $commit = $found;
452       if ($commit ne $treeish) {
453         # Make sure we record the real commit id in the database,
454         # frozentokey, logs, etc. -- instead of an abbreviation or a
455         # branch name which can become ambiguous or point to a
456         # different commit in the future.
457         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458         Log (undef, "Using commit $commit for tree-ish $treeish");
459         if ($commit ne $treeish) {
460           $Job->{'script_version'} = $commit;
461           !$job_has_uuid or
462               $Job->update_attributes('script_version' => $commit) or
463               croak("Error while updating job");
464         }
465       }
466     }
467   }
468
469   if (defined $commit) {
470     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
471     @execargs = ("sh", "-c",
472                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
473     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
474   }
475   else {
476     croak ("could not figure out commit id for $treeish");
477   }
478
479   my $installpid = fork();
480   if ($installpid == 0)
481   {
482     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
483     exit (1);
484   }
485   while (1)
486   {
487     last if $installpid == waitpid (-1, WNOHANG);
488     freeze_if_want_freeze ($installpid);
489     select (undef, undef, undef, 0.1);
490   }
491   Log (undef, "Install exited $?");
492 }
493
494 if (!$have_slurm)
495 {
496   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
497   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
498 }
499
500 # If this job requires a Docker image, install that.
501 my $docker_bin = "/usr/bin/docker.io";
502 my ($docker_locator, $docker_hash);
503 if ($docker_locator = $Job->{docker_image_locator}) {
504   $docker_hash = find_docker_hash($docker_locator);
505   if (!$docker_hash)
506   {
507     croak("No Docker image hash found from locator $docker_locator");
508   }
509   my $docker_install_script = qq{
510 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
511     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
512 fi
513 };
514   my $docker_pid = fork();
515   if ($docker_pid == 0)
516   {
517     srun (["srun", "--nodelist=" . join(',', @node)],
518           ["/bin/sh", "-ec", $docker_install_script]);
519     exit ($?);
520   }
521   while (1)
522   {
523     last if $docker_pid == waitpid (-1, WNOHANG);
524     freeze_if_want_freeze ($docker_pid);
525     select (undef, undef, undef, 0.1);
526   }
527   if ($? != 0)
528   {
529     croak("Installing Docker image from $docker_locator returned exit code $?");
530   }
531 }
532
533 foreach (qw (script script_version script_parameters runtime_constraints))
534 {
535   Log (undef,
536        "$_ " .
537        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
538 }
539 foreach (split (/\n/, $Job->{knobs}))
540 {
541   Log (undef, "knob " . $_);
542 }
543
544
545
546 $main::success = undef;
547
548
549
550 ONELEVEL:
551
552 my $thisround_succeeded = 0;
553 my $thisround_failed = 0;
554 my $thisround_failed_multiple = 0;
555
556 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
557                        or $a <=> $b } @jobstep_todo;
558 my $level = $jobstep[$jobstep_todo[0]]->{level};
559 Log (undef, "start level $level");
560
561
562
563 my %proc;
564 my @freeslot = (0..$#slot);
565 my @holdslot;
566 my %reader;
567 my $progress_is_dirty = 1;
568 my $progress_stats_updated = 0;
569
570 update_progress_stats();
571
572
573
574 THISROUND:
575 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
576 {
577   my $id = $jobstep_todo[$todo_ptr];
578   my $Jobstep = $jobstep[$id];
579   if ($Jobstep->{level} != $level)
580   {
581     next;
582   }
583
584   pipe $reader{$id}, "writer" or croak ($!);
585   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
586   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
587
588   my $childslot = $freeslot[0];
589   my $childnode = $slot[$childslot]->{node};
590   my $childslotname = join (".",
591                             $slot[$childslot]->{node}->{name},
592                             $slot[$childslot]->{cpu});
593   my $childpid = fork();
594   if ($childpid == 0)
595   {
596     $SIG{'INT'} = 'DEFAULT';
597     $SIG{'QUIT'} = 'DEFAULT';
598     $SIG{'TERM'} = 'DEFAULT';
599
600     foreach (values (%reader))
601     {
602       close($_);
603     }
604     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
605     open(STDOUT,">&writer");
606     open(STDERR,">&writer");
607
608     undef $dbh;
609     undef $sth;
610
611     delete $ENV{"GNUPGHOME"};
612     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
613     $ENV{"TASK_QSEQUENCE"} = $id;
614     $ENV{"TASK_SEQUENCE"} = $level;
615     $ENV{"JOB_SCRIPT"} = $Job->{script};
616     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
617       $param =~ tr/a-z/A-Z/;
618       $ENV{"JOB_PARAMETER_$param"} = $value;
619     }
620     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
621     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
622     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
623     $ENV{"HOME"} = $ENV{"TASK_WORK"};
624     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
625     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
626     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
627     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
628
629     $ENV{"GZIP"} = "-n";
630
631     my @srunargs = (
632       "srun",
633       "--nodelist=".$childnode->{name},
634       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
635       "--job-name=$job_id.$id.$$",
636         );
637     my $build_script_to_send = "";
638     my $command =
639         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
640         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
641         ."&& cd $ENV{CRUNCH_TMP} ";
642     if ($build_script)
643     {
644       $build_script_to_send = $build_script;
645       $command .=
646           "&& perl -";
647     }
648     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
649     if ($docker_hash)
650     {
651       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
652       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
653       # Dynamically configure the container to use the host system as its
654       # DNS server.  Get the host's global addresses from the ip command,
655       # and turn them into docker --dns options using gawk.
656       $command .=
657           q{$(ip -o address show scope global |
658               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
659       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
660       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
661       $command .= "--env=\QHOME=/home/crunch\E ";
662       while (my ($env_key, $env_val) = each %ENV)
663       {
664         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
665           if ($env_key eq "TASK_WORK") {
666             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
667           }
668           elsif ($env_key eq "TASK_KEEPMOUNT") {
669             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
670           }
671           elsif ($env_key eq "CRUNCH_SRC") {
672             $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
673           }
674           else {
675             $command .= "--env=\Q$env_key=$env_val\E ";
676           }
677         }
678       }
679       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
680       $command .= "\Q$docker_hash\E ";
681       $command .= "stdbuf --output=0 --error=0 ";
682       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
683     } else {
684       # Non-docker run
685       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
686       $command .= "stdbuf --output=0 --error=0 ";
687       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
688     }
689
690     my @execargs = ('bash', '-c', $command);
691     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
692     exit (111);
693   }
694   close("writer");
695   if (!defined $childpid)
696   {
697     close $reader{$id};
698     delete $reader{$id};
699     next;
700   }
701   shift @freeslot;
702   $proc{$childpid} = { jobstep => $id,
703                        time => time,
704                        slot => $childslot,
705                        jobstepname => "$job_id.$id.$childpid",
706                      };
707   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
708   $slot[$childslot]->{pid} = $childpid;
709
710   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
711   Log ($id, "child $childpid started on $childslotname");
712   $Jobstep->{starttime} = time;
713   $Jobstep->{node} = $childnode->{name};
714   $Jobstep->{slotindex} = $childslot;
715   delete $Jobstep->{stderr};
716   delete $Jobstep->{finishtime};
717
718   splice @jobstep_todo, $todo_ptr, 1;
719   --$todo_ptr;
720
721   $progress_is_dirty = 1;
722
723   while (!@freeslot
724          ||
725          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
726   {
727     last THISROUND if $main::please_freeze;
728     if ($main::please_info)
729     {
730       $main::please_info = 0;
731       freeze();
732       collate_output();
733       save_meta(1);
734       update_progress_stats();
735     }
736     my $gotsome
737         = readfrompipes ()
738         + reapchildren ();
739     if (!$gotsome)
740     {
741       check_refresh_wanted();
742       check_squeue();
743       update_progress_stats();
744       select (undef, undef, undef, 0.1);
745     }
746     elsif (time - $progress_stats_updated >= 30)
747     {
748       update_progress_stats();
749     }
750     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
751         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
752     {
753       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
754           .($thisround_failed+$thisround_succeeded)
755           .") -- giving up on this round";
756       Log (undef, $message);
757       last THISROUND;
758     }
759
760     # move slots from freeslot to holdslot (or back to freeslot) if necessary
761     for (my $i=$#freeslot; $i>=0; $i--) {
762       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
763         push @holdslot, (splice @freeslot, $i, 1);
764       }
765     }
766     for (my $i=$#holdslot; $i>=0; $i--) {
767       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
768         push @freeslot, (splice @holdslot, $i, 1);
769       }
770     }
771
772     # give up if no nodes are succeeding
773     if (!grep { $_->{node}->{losing_streak} == 0 &&
774                     $_->{node}->{hold_count} < 4 } @slot) {
775       my $message = "Every node has failed -- giving up on this round";
776       Log (undef, $message);
777       last THISROUND;
778     }
779   }
780 }
781
782
783 push @freeslot, splice @holdslot;
784 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
785
786
787 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
788 while (%proc)
789 {
790   if ($main::please_continue) {
791     $main::please_continue = 0;
792     goto THISROUND;
793   }
794   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
795   readfrompipes ();
796   if (!reapchildren())
797   {
798     check_refresh_wanted();
799     check_squeue();
800     update_progress_stats();
801     select (undef, undef, undef, 0.1);
802     killem (keys %proc) if $main::please_freeze;
803   }
804 }
805
806 update_progress_stats();
807 freeze_if_want_freeze();
808
809
810 if (!defined $main::success)
811 {
812   if (@jobstep_todo &&
813       $thisround_succeeded == 0 &&
814       ($thisround_failed == 0 || $thisround_failed > 4))
815   {
816     my $message = "stop because $thisround_failed tasks failed and none succeeded";
817     Log (undef, $message);
818     $main::success = 0;
819   }
820   if (!@jobstep_todo)
821   {
822     $main::success = 1;
823   }
824 }
825
826 goto ONELEVEL if !defined $main::success;
827
828
829 release_allocation();
830 freeze();
831 my $collated_output = &collate_output();
832
833 if ($job_has_uuid) {
834   $Job->update_attributes('running' => 0,
835                           'success' => $collated_output && $main::success,
836                           'finished_at' => scalar gmtime)
837 }
838
839 if ($collated_output)
840 {
841   eval {
842     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
843         or die "failed to get collated manifest: $!";
844     # Read the original manifest, and strip permission hints from it,
845     # so we can put the result in a Collection.
846     my @stripped_manifest_lines = ();
847     my $orig_manifest_text = '';
848     while (my $manifest_line = <$orig_manifest>) {
849       $orig_manifest_text .= $manifest_line;
850       my @words = split(/ /, $manifest_line, -1);
851       foreach my $ii (0..$#words) {
852         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
853           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
854         }
855       }
856       push(@stripped_manifest_lines, join(" ", @words));
857     }
858     my $stripped_manifest_text = join("", @stripped_manifest_lines);
859     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
860       'uuid' => md5_hex($stripped_manifest_text),
861       'manifest_text' => $orig_manifest_text,
862     });
863     $Job->update_attributes('output' => $output->{uuid});
864     if ($Job->{'output_is_persistent'}) {
865       $arv->{'links'}->{'create'}->execute('link' => {
866         'tail_kind' => 'arvados#user',
867         'tail_uuid' => $User->{'uuid'},
868         'head_kind' => 'arvados#collection',
869         'head_uuid' => $Job->{'output'},
870         'link_class' => 'resources',
871         'name' => 'wants',
872       });
873     }
874   };
875   if ($@) {
876     Log (undef, "Failed to register output manifest: $@");
877   }
878 }
879
880 Log (undef, "finish");
881
882 save_meta();
883 exit 0;
884
885
886
887 sub update_progress_stats
888 {
889   $progress_stats_updated = time;
890   return if !$progress_is_dirty;
891   my ($todo, $done, $running) = (scalar @jobstep_todo,
892                                  scalar @jobstep_done,
893                                  scalar @slot - scalar @freeslot - scalar @holdslot);
894   $Job->{'tasks_summary'} ||= {};
895   $Job->{'tasks_summary'}->{'todo'} = $todo;
896   $Job->{'tasks_summary'}->{'done'} = $done;
897   $Job->{'tasks_summary'}->{'running'} = $running;
898   if ($job_has_uuid) {
899     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
900   }
901   Log (undef, "status: $done done, $running running, $todo todo");
902   $progress_is_dirty = 0;
903 }
904
905
906
907 sub reapchildren
908 {
909   my $pid = waitpid (-1, WNOHANG);
910   return 0 if $pid <= 0;
911
912   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
913                   . "."
914                   . $slot[$proc{$pid}->{slot}]->{cpu});
915   my $jobstepid = $proc{$pid}->{jobstep};
916   my $elapsed = time - $proc{$pid}->{time};
917   my $Jobstep = $jobstep[$jobstepid];
918
919   my $childstatus = $?;
920   my $exitvalue = $childstatus >> 8;
921   my $exitinfo = sprintf("exit %d signal %d%s",
922                          $exitvalue,
923                          $childstatus & 127,
924                          ($childstatus & 128 ? ' core dump' : ''));
925   $Jobstep->{'arvados_task'}->reload;
926   my $task_success = $Jobstep->{'arvados_task'}->{success};
927
928   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
929
930   if (!defined $task_success) {
931     # task did not indicate one way or the other --> fail
932     $Jobstep->{'arvados_task'}->{success} = 0;
933     $Jobstep->{'arvados_task'}->save;
934     $task_success = 0;
935   }
936
937   if (!$task_success)
938   {
939     my $temporary_fail;
940     $temporary_fail ||= $Jobstep->{node_fail};
941     $temporary_fail ||= ($exitvalue == 111);
942
943     ++$thisround_failed;
944     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
945
946     # Check for signs of a failed or misconfigured node
947     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
948         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
949       # Don't count this against jobstep failure thresholds if this
950       # node is already suspected faulty and srun exited quickly
951       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
952           $elapsed < 5) {
953         Log ($jobstepid, "blaming failure on suspect node " .
954              $slot[$proc{$pid}->{slot}]->{node}->{name});
955         $temporary_fail ||= 1;
956       }
957       ban_node_by_slot($proc{$pid}->{slot});
958     }
959
960     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
961                              ++$Jobstep->{'failures'},
962                              $temporary_fail ? 'temporary ' : 'permanent',
963                              $elapsed));
964
965     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
966       # Give up on this task, and the whole job
967       $main::success = 0;
968       $main::please_freeze = 1;
969     }
970     else {
971       # Put this task back on the todo queue
972       push @jobstep_todo, $jobstepid;
973     }
974     $Job->{'tasks_summary'}->{'failed'}++;
975   }
976   else
977   {
978     ++$thisround_succeeded;
979     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
980     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
981     push @jobstep_done, $jobstepid;
982     Log ($jobstepid, "success in $elapsed seconds");
983   }
984   $Jobstep->{exitcode} = $childstatus;
985   $Jobstep->{finishtime} = time;
986   process_stderr ($jobstepid, $task_success);
987   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
988
989   close $reader{$jobstepid};
990   delete $reader{$jobstepid};
991   delete $slot[$proc{$pid}->{slot}]->{pid};
992   push @freeslot, $proc{$pid}->{slot};
993   delete $proc{$pid};
994
995   if ($task_success) {
996     # Load new tasks
997     my $newtask_list = [];
998     my $newtask_results;
999     do {
1000       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1001         'where' => {
1002           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1003         },
1004         'order' => 'qsequence',
1005         'offset' => scalar(@$newtask_list),
1006       );
1007       push(@$newtask_list, @{$newtask_results->{items}});
1008     } while (@{$newtask_results->{items}});
1009     foreach my $arvados_task (@$newtask_list) {
1010       my $jobstep = {
1011         'level' => $arvados_task->{'sequence'},
1012         'failures' => 0,
1013         'arvados_task' => $arvados_task
1014       };
1015       push @jobstep, $jobstep;
1016       push @jobstep_todo, $#jobstep;
1017     }
1018   }
1019
1020   $progress_is_dirty = 1;
1021   1;
1022 }
1023
1024 sub check_refresh_wanted
1025 {
1026   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1027   if (@stat && $stat[9] > $latest_refresh) {
1028     $latest_refresh = scalar time;
1029     if ($job_has_uuid) {
1030       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1031       for my $attr ('cancelled_at',
1032                     'cancelled_by_user_uuid',
1033                     'cancelled_by_client_uuid') {
1034         $Job->{$attr} = $Job2->{$attr};
1035       }
1036       if ($Job->{'cancelled_at'}) {
1037         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1038              " by user " . $Job->{cancelled_by_user_uuid});
1039         $main::success = 0;
1040         $main::please_freeze = 1;
1041       }
1042     }
1043   }
1044 }
1045
1046 sub check_squeue
1047 {
1048   # return if the kill list was checked <4 seconds ago
1049   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1050   {
1051     return;
1052   }
1053   $squeue_kill_checked = time;
1054
1055   # use killem() on procs whose killtime is reached
1056   for (keys %proc)
1057   {
1058     if (exists $proc{$_}->{killtime}
1059         && $proc{$_}->{killtime} <= time)
1060     {
1061       killem ($_);
1062     }
1063   }
1064
1065   # return if the squeue was checked <60 seconds ago
1066   if (defined $squeue_checked && $squeue_checked > time - 60)
1067   {
1068     return;
1069   }
1070   $squeue_checked = time;
1071
1072   if (!$have_slurm)
1073   {
1074     # here is an opportunity to check for mysterious problems with local procs
1075     return;
1076   }
1077
1078   # get a list of steps still running
1079   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1080   chop @squeue;
1081   if ($squeue[-1] ne "ok")
1082   {
1083     return;
1084   }
1085   pop @squeue;
1086
1087   # which of my jobsteps are running, according to squeue?
1088   my %ok;
1089   foreach (@squeue)
1090   {
1091     if (/^(\d+)\.(\d+) (\S+)/)
1092     {
1093       if ($1 eq $ENV{SLURM_JOBID})
1094       {
1095         $ok{$3} = 1;
1096       }
1097     }
1098   }
1099
1100   # which of my active child procs (>60s old) were not mentioned by squeue?
1101   foreach (keys %proc)
1102   {
1103     if ($proc{$_}->{time} < time - 60
1104         && !exists $ok{$proc{$_}->{jobstepname}}
1105         && !exists $proc{$_}->{killtime})
1106     {
1107       # kill this proc if it hasn't exited in 30 seconds
1108       $proc{$_}->{killtime} = time + 30;
1109     }
1110   }
1111 }
1112
1113
1114 sub release_allocation
1115 {
1116   if ($have_slurm)
1117   {
1118     Log (undef, "release job allocation");
1119     system "scancel $ENV{SLURM_JOBID}";
1120   }
1121 }
1122
1123
1124 sub readfrompipes
1125 {
1126   my $gotsome = 0;
1127   foreach my $job (keys %reader)
1128   {
1129     my $buf;
1130     while (0 < sysread ($reader{$job}, $buf, 8192))
1131     {
1132       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1133       $jobstep[$job]->{stderr} .= $buf;
1134       preprocess_stderr ($job);
1135       if (length ($jobstep[$job]->{stderr}) > 16384)
1136       {
1137         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1138       }
1139       $gotsome = 1;
1140     }
1141   }
1142   return $gotsome;
1143 }
1144
1145
1146 sub preprocess_stderr
1147 {
1148   my $job = shift;
1149
1150   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1151     my $line = $1;
1152     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1153     Log ($job, "stderr $line");
1154     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1155       # whoa.
1156       $main::please_freeze = 1;
1157     }
1158     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1159       $jobstep[$job]->{node_fail} = 1;
1160       ban_node_by_slot($jobstep[$job]->{slotindex});
1161     }
1162   }
1163 }
1164
1165
1166 sub process_stderr
1167 {
1168   my $job = shift;
1169   my $task_success = shift;
1170   preprocess_stderr ($job);
1171
1172   map {
1173     Log ($job, "stderr $_");
1174   } split ("\n", $jobstep[$job]->{stderr});
1175 }
1176
1177 sub fetch_block
1178 {
1179   my $hash = shift;
1180   my ($keep, $child_out, $output_block);
1181
1182   my $cmd = "arv-get \Q$hash\E";
1183   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1184   sysread($keep, $output_block, 64 * 1024 * 1024);
1185   close $keep;
1186   return $output_block;
1187 }
1188
1189 sub collate_output
1190 {
1191   Log (undef, "collate");
1192
1193   my ($child_out, $child_in);
1194   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1195   my $joboutput;
1196   for (@jobstep)
1197   {
1198     next if (!exists $_->{'arvados_task'}->{output} ||
1199              !$_->{'arvados_task'}->{'success'} ||
1200              $_->{'exitcode'} != 0);
1201     my $output = $_->{'arvados_task'}->{output};
1202     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1203     {
1204       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1205       print $child_in $output;
1206     }
1207     elsif (@jobstep == 1)
1208     {
1209       $joboutput = $output;
1210       last;
1211     }
1212     elsif (defined (my $outblock = fetch_block ($output)))
1213     {
1214       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1215       print $child_in $outblock;
1216     }
1217     else
1218     {
1219       Log (undef, "XXX fetch_block($output) failed XXX");
1220       $main::success = 0;
1221     }
1222   }
1223   $child_in->close;
1224
1225   if (!defined $joboutput) {
1226     my $s = IO::Select->new($child_out);
1227     if ($s->can_read(120)) {
1228       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1229       chomp($joboutput);
1230     } else {
1231       Log (undef, "timed out reading from 'arv-put'");
1232     }
1233   }
1234   waitpid($pid, 0);
1235
1236   if ($joboutput)
1237   {
1238     Log (undef, "output $joboutput");
1239     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1240   }
1241   else
1242   {
1243     Log (undef, "output undef");
1244   }
1245   return $joboutput;
1246 }
1247
1248
1249 sub killem
1250 {
1251   foreach (@_)
1252   {
1253     my $sig = 2;                # SIGINT first
1254     if (exists $proc{$_}->{"sent_$sig"} &&
1255         time - $proc{$_}->{"sent_$sig"} > 4)
1256     {
1257       $sig = 15;                # SIGTERM if SIGINT doesn't work
1258     }
1259     if (exists $proc{$_}->{"sent_$sig"} &&
1260         time - $proc{$_}->{"sent_$sig"} > 4)
1261     {
1262       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1263     }
1264     if (!exists $proc{$_}->{"sent_$sig"})
1265     {
1266       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1267       kill $sig, $_;
1268       select (undef, undef, undef, 0.1);
1269       if ($sig == 2)
1270       {
1271         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1272       }
1273       $proc{$_}->{"sent_$sig"} = time;
1274       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1275     }
1276   }
1277 }
1278
1279
1280 sub fhbits
1281 {
1282   my($bits);
1283   for (@_) {
1284     vec($bits,fileno($_),1) = 1;
1285   }
1286   $bits;
1287 }
1288
1289
1290 sub Log                         # ($jobstep_id, $logmessage)
1291 {
1292   if ($_[1] =~ /\n/) {
1293     for my $line (split (/\n/, $_[1])) {
1294       Log ($_[0], $line);
1295     }
1296     return;
1297   }
1298   my $fh = select STDERR; $|=1; select $fh;
1299   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1300   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1301   $message .= "\n";
1302   my $datetime;
1303   if ($local_logfile || -t STDERR) {
1304     my @gmtime = gmtime;
1305     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1306                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1307   }
1308   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1309
1310   if ($local_logfile) {
1311     print $local_logfile $datetime . " " . $message;
1312   }
1313 }
1314
1315
1316 sub croak
1317 {
1318   my ($package, $file, $line) = caller;
1319   my $message = "@_ at $file line $line\n";
1320   Log (undef, $message);
1321   freeze() if @jobstep_todo;
1322   collate_output() if @jobstep_todo;
1323   cleanup();
1324   save_meta() if $local_logfile;
1325   die;
1326 }
1327
1328
1329 sub cleanup
1330 {
1331   return if !$job_has_uuid;
1332   $Job->update_attributes('running' => 0,
1333                           'success' => 0,
1334                           'finished_at' => scalar gmtime);
1335 }
1336
1337
1338 sub save_meta
1339 {
1340   my $justcheckpoint = shift; # false if this will be the last meta saved
1341   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1342
1343   $local_logfile->flush;
1344   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1345       . quotemeta($local_logfile->filename);
1346   my $loglocator = `$cmd`;
1347   die "system $cmd failed: $?" if $?;
1348   chomp($loglocator);
1349
1350   $local_logfile = undef;   # the temp file is automatically deleted
1351   Log (undef, "log manifest is $loglocator");
1352   $Job->{'log'} = $loglocator;
1353   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1354 }
1355
1356
1357 sub freeze_if_want_freeze
1358 {
1359   if ($main::please_freeze)
1360   {
1361     release_allocation();
1362     if (@_)
1363     {
1364       # kill some srun procs before freeze+stop
1365       map { $proc{$_} = {} } @_;
1366       while (%proc)
1367       {
1368         killem (keys %proc);
1369         select (undef, undef, undef, 0.1);
1370         my $died;
1371         while (($died = waitpid (-1, WNOHANG)) > 0)
1372         {
1373           delete $proc{$died};
1374         }
1375       }
1376     }
1377     freeze();
1378     collate_output();
1379     cleanup();
1380     save_meta();
1381     exit 0;
1382   }
1383 }
1384
1385
1386 sub freeze
1387 {
1388   Log (undef, "Freeze not implemented");
1389   return;
1390 }
1391
1392
1393 sub thaw
1394 {
1395   croak ("Thaw not implemented");
1396 }
1397
1398
1399 sub freezequote
1400 {
1401   my $s = shift;
1402   $s =~ s/\\/\\\\/g;
1403   $s =~ s/\n/\\n/g;
1404   return $s;
1405 }
1406
1407
1408 sub freezeunquote
1409 {
1410   my $s = shift;
1411   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1412   return $s;
1413 }
1414
1415
1416 sub srun
1417 {
1418   my $srunargs = shift;
1419   my $execargs = shift;
1420   my $opts = shift || {};
1421   my $stdin = shift;
1422   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1423   print STDERR (join (" ",
1424                       map { / / ? "'$_'" : $_ }
1425                       (@$args)),
1426                 "\n")
1427       if $ENV{CRUNCH_DEBUG};
1428
1429   if (defined $stdin) {
1430     my $child = open STDIN, "-|";
1431     defined $child or die "no fork: $!";
1432     if ($child == 0) {
1433       print $stdin or die $!;
1434       close STDOUT or die $!;
1435       exit 0;
1436     }
1437   }
1438
1439   return system (@$args) if $opts->{fork};
1440
1441   exec @$args;
1442   warn "ENV size is ".length(join(" ",%ENV));
1443   die "exec failed: $!: @$args";
1444 }
1445
1446
1447 sub ban_node_by_slot {
1448   # Don't start any new jobsteps on this node for 60 seconds
1449   my $slotid = shift;
1450   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1451   $slot[$slotid]->{node}->{hold_count}++;
1452   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1453 }
1454
1455 sub must_lock_now
1456 {
1457   my ($lockfile, $error_message) = @_;
1458   open L, ">", $lockfile or croak("$lockfile: $!");
1459   if (!flock L, LOCK_EX|LOCK_NB) {
1460     croak("Can't lock $lockfile: $error_message\n");
1461   }
1462 }
1463
1464 sub find_docker_hash {
1465   # Given a Keep locator, search for a matching link to find the Docker hash
1466   # of the stored image.
1467   my $locator = shift;
1468   my $links_result = $arv->{links}->{list}->execute(
1469     filters => [["head_uuid", "=", $locator],
1470                 ["link_class", "=", "docker_image_hash"]],
1471     limit => 1);
1472   my $docker_hash;
1473   foreach my $link (@{$links_result->{items}}) {
1474     $docker_hash = lc($link->{name});
1475   }
1476   return $docker_hash;
1477 }
1478
1479 __DATA__
1480 #!/usr/bin/perl
1481
1482 # checkout-and-build
1483
1484 use Fcntl ':flock';
1485
1486 my $destdir = $ENV{"CRUNCH_SRC"};
1487 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1488 my $repo = $ENV{"CRUNCH_SRC_URL"};
1489
1490 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1491 flock L, LOCK_EX;
1492 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1493     exit 0;
1494 }
1495
1496 unlink "$destdir.commit";
1497 open STDOUT, ">", "$destdir.log";
1498 open STDERR, ">&STDOUT";
1499
1500 mkdir $destdir;
1501 my @git_archive_data = <DATA>;
1502 if (@git_archive_data) {
1503   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1504   print TARX @git_archive_data;
1505   if(!close(TARX)) {
1506     die "'tar -C $destdir -xf -' exited $?: $!";
1507   }
1508 }
1509
1510 my $pwd;
1511 chomp ($pwd = `pwd`);
1512 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1513 mkdir $install_dir;
1514
1515 for my $src_path ("$destdir/arvados/sdk/python") {
1516   if (-d $src_path) {
1517     shell_or_die ("virtualenv", $install_dir);
1518     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1519   }
1520 }
1521
1522 if (-e "$destdir/crunch_scripts/install") {
1523     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1524 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1525     # Old version
1526     shell_or_die ("./tests/autotests.sh", $install_dir);
1527 } elsif (-e "./install.sh") {
1528     shell_or_die ("./install.sh", $install_dir);
1529 }
1530
1531 if ($commit) {
1532     unlink "$destdir.commit.new";
1533     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1534     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1535 }
1536
1537 close L;
1538
1539 exit 0;
1540
1541 sub shell_or_die
1542 {
1543   if ($ENV{"DEBUG"}) {
1544     print STDERR "@_\n";
1545   }
1546   system (@_) == 0
1547       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1548 }
1549
1550 __DATA__