Merge branch 'master' into 3118-docker-fixes
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506   $docker_hash = find_docker_hash($docker_locator);
507   if (!$docker_hash)
508   {
509     croak("No Docker image hash found from locator $docker_locator");
510   }
511   my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
514 fi
515 };
516   my $docker_pid = fork();
517   if ($docker_pid == 0)
518   {
519     srun (["srun", "--nodelist=" . join(',', @node)],
520           ["/bin/sh", "-ec", $docker_install_script]);
521     exit ($?);
522   }
523   while (1)
524   {
525     last if $docker_pid == waitpid (-1, WNOHANG);
526     freeze_if_want_freeze ($docker_pid);
527     select (undef, undef, undef, 0.1);
528   }
529   if ($? != 0)
530   {
531     croak("Installing Docker image from $docker_locator returned exit code $?");
532   }
533 }
534
535 foreach (qw (script script_version script_parameters runtime_constraints))
536 {
537   Log (undef,
538        "$_ " .
539        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
540 }
541 foreach (split (/\n/, $Job->{knobs}))
542 {
543   Log (undef, "knob " . $_);
544 }
545
546
547
548 $main::success = undef;
549
550
551
552 ONELEVEL:
553
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
557
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559                        or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
562
563
564
565 my %proc;
566 my @freeslot = (0..$#slot);
567 my @holdslot;
568 my %reader;
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
571
572 update_progress_stats();
573
574
575
576 THISROUND:
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
578 {
579   my $id = $jobstep_todo[$todo_ptr];
580   my $Jobstep = $jobstep[$id];
581   if ($Jobstep->{level} != $level)
582   {
583     next;
584   }
585
586   pipe $reader{$id}, "writer" or croak ($!);
587   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
589
590   my $childslot = $freeslot[0];
591   my $childnode = $slot[$childslot]->{node};
592   my $childslotname = join (".",
593                             $slot[$childslot]->{node}->{name},
594                             $slot[$childslot]->{cpu});
595   my $childpid = fork();
596   if ($childpid == 0)
597   {
598     $SIG{'INT'} = 'DEFAULT';
599     $SIG{'QUIT'} = 'DEFAULT';
600     $SIG{'TERM'} = 'DEFAULT';
601
602     foreach (values (%reader))
603     {
604       close($_);
605     }
606     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607     open(STDOUT,">&writer");
608     open(STDERR,">&writer");
609
610     undef $dbh;
611     undef $sth;
612
613     delete $ENV{"GNUPGHOME"};
614     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615     $ENV{"TASK_QSEQUENCE"} = $id;
616     $ENV{"TASK_SEQUENCE"} = $level;
617     $ENV{"JOB_SCRIPT"} = $Job->{script};
618     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619       $param =~ tr/a-z/A-Z/;
620       $ENV{"JOB_PARAMETER_$param"} = $value;
621     }
622     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625     $ENV{"HOME"} = $ENV{"TASK_WORK"};
626     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
627     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
628     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
629     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
630
631     $ENV{"GZIP"} = "-n";
632
633     my @srunargs = (
634       "srun",
635       "--nodelist=".$childnode->{name},
636       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
637       "--job-name=$job_id.$id.$$",
638         );
639     my $build_script_to_send = "";
640     my $command =
641         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
642         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
643         ."&& chmod og+wrx $ENV{TASK_WORK}"
644         ."&& cd $ENV{CRUNCH_TMP} ";
645     if ($build_script)
646     {
647       $build_script_to_send = $build_script;
648       $command .=
649           "&& perl -";
650     }
651     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
652     if ($docker_hash)
653     {
654       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
655       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
656       # Dynamically configure the container to use the host system as its
657       # DNS server.  Get the host's global addresses from the ip command,
658       # and turn them into docker --dns options using gawk.
659       $command .=
660           q{$(ip -o address show scope global |
661               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
662       $command .= "-v \Q$ENV{TASK_WORK}:/tmp/crunch-job:rw\E ";
663       $command .= "-v \Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
664       $command .= "-v \Q$ENV{TASK_KEEPMOUNT}:/mnt:ro\E ";
665       $command .= "-e \QHOME=/tmp/crunch-job\E ";
666       while (my ($env_key, $env_val) = each %ENV)
667       {
668         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
669           if ($env_key eq "TASK_WORK") {
670             $command .= "-e \QTASK_WORK=/tmp/crunch-job\E ";
671           }
672           elsif ($env_key eq "TASK_KEEPMOUNT") {
673             $command .= "-e \QTASK_KEEPMOUNT=/mnt\E ";
674           }
675           elsif ($env_key eq "CRUNCH_SRC") {
676             $command .= "-e \QCRUNCH_SRC=/tmp/crunch-src\E ";
677           }
678           else {
679             $command .= "-e \Q$env_key=$env_val\E ";
680           }
681         }
682       }
683       $command .= "\Q$docker_hash\E ";
684       $command .= "stdbuf -o0 -e0 ";
685       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
686     } else {
687       # Non-docker run
688       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
689       $command .= "stdbuf -o0 -e0 ";
690       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
691     }
692
693     my @execargs = ('bash', '-c', $command);
694     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
695     exit (111);
696   }
697   close("writer");
698   if (!defined $childpid)
699   {
700     close $reader{$id};
701     delete $reader{$id};
702     next;
703   }
704   shift @freeslot;
705   $proc{$childpid} = { jobstep => $id,
706                        time => time,
707                        slot => $childslot,
708                        jobstepname => "$job_id.$id.$childpid",
709                      };
710   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
711   $slot[$childslot]->{pid} = $childpid;
712
713   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
714   Log ($id, "child $childpid started on $childslotname");
715   $Jobstep->{starttime} = time;
716   $Jobstep->{node} = $childnode->{name};
717   $Jobstep->{slotindex} = $childslot;
718   delete $Jobstep->{stderr};
719   delete $Jobstep->{finishtime};
720
721   splice @jobstep_todo, $todo_ptr, 1;
722   --$todo_ptr;
723
724   $progress_is_dirty = 1;
725
726   while (!@freeslot
727          ||
728          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
729   {
730     last THISROUND if $main::please_freeze;
731     if ($main::please_info)
732     {
733       $main::please_info = 0;
734       freeze();
735       collate_output();
736       save_meta(1);
737       update_progress_stats();
738     }
739     my $gotsome
740         = readfrompipes ()
741         + reapchildren ();
742     if (!$gotsome)
743     {
744       check_refresh_wanted();
745       check_squeue();
746       update_progress_stats();
747       select (undef, undef, undef, 0.1);
748     }
749     elsif (time - $progress_stats_updated >= 30)
750     {
751       update_progress_stats();
752     }
753     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
754         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
755     {
756       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
757           .($thisround_failed+$thisround_succeeded)
758           .") -- giving up on this round";
759       Log (undef, $message);
760       last THISROUND;
761     }
762
763     # move slots from freeslot to holdslot (or back to freeslot) if necessary
764     for (my $i=$#freeslot; $i>=0; $i--) {
765       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
766         push @holdslot, (splice @freeslot, $i, 1);
767       }
768     }
769     for (my $i=$#holdslot; $i>=0; $i--) {
770       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
771         push @freeslot, (splice @holdslot, $i, 1);
772       }
773     }
774
775     # give up if no nodes are succeeding
776     if (!grep { $_->{node}->{losing_streak} == 0 &&
777                     $_->{node}->{hold_count} < 4 } @slot) {
778       my $message = "Every node has failed -- giving up on this round";
779       Log (undef, $message);
780       last THISROUND;
781     }
782   }
783 }
784
785
786 push @freeslot, splice @holdslot;
787 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
788
789
790 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
791 while (%proc)
792 {
793   if ($main::please_continue) {
794     $main::please_continue = 0;
795     goto THISROUND;
796   }
797   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
798   readfrompipes ();
799   if (!reapchildren())
800   {
801     check_refresh_wanted();
802     check_squeue();
803     update_progress_stats();
804     select (undef, undef, undef, 0.1);
805     killem (keys %proc) if $main::please_freeze;
806   }
807 }
808
809 update_progress_stats();
810 freeze_if_want_freeze();
811
812
813 if (!defined $main::success)
814 {
815   if (@jobstep_todo &&
816       $thisround_succeeded == 0 &&
817       ($thisround_failed == 0 || $thisround_failed > 4))
818   {
819     my $message = "stop because $thisround_failed tasks failed and none succeeded";
820     Log (undef, $message);
821     $main::success = 0;
822   }
823   if (!@jobstep_todo)
824   {
825     $main::success = 1;
826   }
827 }
828
829 goto ONELEVEL if !defined $main::success;
830
831
832 release_allocation();
833 freeze();
834 my $collated_output = &collate_output();
835
836 if ($job_has_uuid) {
837   $Job->update_attributes('running' => 0,
838                           'success' => $collated_output && $main::success,
839                           'finished_at' => scalar gmtime)
840 }
841
842 if ($collated_output)
843 {
844   eval {
845     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
846         or die "failed to get collated manifest: $!";
847     # Read the original manifest, and strip permission hints from it,
848     # so we can put the result in a Collection.
849     my @stripped_manifest_lines = ();
850     my $orig_manifest_text = '';
851     while (my $manifest_line = <$orig_manifest>) {
852       $orig_manifest_text .= $manifest_line;
853       my @words = split(/ /, $manifest_line, -1);
854       foreach my $ii (0..$#words) {
855         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
856           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
857         }
858       }
859       push(@stripped_manifest_lines, join(" ", @words));
860     }
861     my $stripped_manifest_text = join("", @stripped_manifest_lines);
862     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
863       'uuid' => md5_hex($stripped_manifest_text),
864       'manifest_text' => $orig_manifest_text,
865     });
866     $Job->update_attributes('output' => $output->{uuid});
867     if ($Job->{'output_is_persistent'}) {
868       $arv->{'links'}->{'create'}->execute('link' => {
869         'tail_kind' => 'arvados#user',
870         'tail_uuid' => $User->{'uuid'},
871         'head_kind' => 'arvados#collection',
872         'head_uuid' => $Job->{'output'},
873         'link_class' => 'resources',
874         'name' => 'wants',
875       });
876     }
877   };
878   if ($@) {
879     Log (undef, "Failed to register output manifest: $@");
880   }
881 }
882
883 Log (undef, "finish");
884
885 save_meta();
886 exit 0;
887
888
889
890 sub update_progress_stats
891 {
892   $progress_stats_updated = time;
893   return if !$progress_is_dirty;
894   my ($todo, $done, $running) = (scalar @jobstep_todo,
895                                  scalar @jobstep_done,
896                                  scalar @slot - scalar @freeslot - scalar @holdslot);
897   $Job->{'tasks_summary'} ||= {};
898   $Job->{'tasks_summary'}->{'todo'} = $todo;
899   $Job->{'tasks_summary'}->{'done'} = $done;
900   $Job->{'tasks_summary'}->{'running'} = $running;
901   if ($job_has_uuid) {
902     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
903   }
904   Log (undef, "status: $done done, $running running, $todo todo");
905   $progress_is_dirty = 0;
906 }
907
908
909
910 sub reapchildren
911 {
912   my $pid = waitpid (-1, WNOHANG);
913   return 0 if $pid <= 0;
914
915   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
916                   . "."
917                   . $slot[$proc{$pid}->{slot}]->{cpu});
918   my $jobstepid = $proc{$pid}->{jobstep};
919   my $elapsed = time - $proc{$pid}->{time};
920   my $Jobstep = $jobstep[$jobstepid];
921
922   my $childstatus = $?;
923   my $exitvalue = $childstatus >> 8;
924   my $exitinfo = sprintf("exit %d signal %d%s",
925                          $exitvalue,
926                          $childstatus & 127,
927                          ($childstatus & 128 ? ' core dump' : ''));
928   $Jobstep->{'arvados_task'}->reload;
929   my $task_success = $Jobstep->{'arvados_task'}->{success};
930
931   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
932
933   if (!defined $task_success) {
934     # task did not indicate one way or the other --> fail
935     $Jobstep->{'arvados_task'}->{success} = 0;
936     $Jobstep->{'arvados_task'}->save;
937     $task_success = 0;
938   }
939
940   if (!$task_success)
941   {
942     my $temporary_fail;
943     $temporary_fail ||= $Jobstep->{node_fail};
944     $temporary_fail ||= ($exitvalue == 111);
945
946     ++$thisround_failed;
947     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
948
949     # Check for signs of a failed or misconfigured node
950     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
951         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
952       # Don't count this against jobstep failure thresholds if this
953       # node is already suspected faulty and srun exited quickly
954       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
955           $elapsed < 5) {
956         Log ($jobstepid, "blaming failure on suspect node " .
957              $slot[$proc{$pid}->{slot}]->{node}->{name});
958         $temporary_fail ||= 1;
959       }
960       ban_node_by_slot($proc{$pid}->{slot});
961     }
962
963     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
964                              ++$Jobstep->{'failures'},
965                              $temporary_fail ? 'temporary ' : 'permanent',
966                              $elapsed));
967
968     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
969       # Give up on this task, and the whole job
970       $main::success = 0;
971       $main::please_freeze = 1;
972     }
973     else {
974       # Put this task back on the todo queue
975       push @jobstep_todo, $jobstepid;
976     }
977     $Job->{'tasks_summary'}->{'failed'}++;
978   }
979   else
980   {
981     ++$thisround_succeeded;
982     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
983     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
984     push @jobstep_done, $jobstepid;
985     Log ($jobstepid, "success in $elapsed seconds");
986   }
987   $Jobstep->{exitcode} = $childstatus;
988   $Jobstep->{finishtime} = time;
989   process_stderr ($jobstepid, $task_success);
990   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
991
992   close $reader{$jobstepid};
993   delete $reader{$jobstepid};
994   delete $slot[$proc{$pid}->{slot}]->{pid};
995   push @freeslot, $proc{$pid}->{slot};
996   delete $proc{$pid};
997
998   # Load new tasks
999   my $newtask_list = [];
1000   my $newtask_results;
1001   do {
1002     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1003       'where' => {
1004         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1005       },
1006       'order' => 'qsequence',
1007       'offset' => scalar(@$newtask_list),
1008     );
1009     push(@$newtask_list, @{$newtask_results->{items}});
1010   } while (@{$newtask_results->{items}});
1011   foreach my $arvados_task (@$newtask_list) {
1012     my $jobstep = {
1013       'level' => $arvados_task->{'sequence'},
1014       'failures' => 0,
1015       'arvados_task' => $arvados_task
1016     };
1017     push @jobstep, $jobstep;
1018     push @jobstep_todo, $#jobstep;
1019   }
1020
1021   $progress_is_dirty = 1;
1022   1;
1023 }
1024
1025 sub check_refresh_wanted
1026 {
1027   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1028   if (@stat && $stat[9] > $latest_refresh) {
1029     $latest_refresh = scalar time;
1030     if ($job_has_uuid) {
1031       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1032       for my $attr ('cancelled_at',
1033                     'cancelled_by_user_uuid',
1034                     'cancelled_by_client_uuid') {
1035         $Job->{$attr} = $Job2->{$attr};
1036       }
1037       if ($Job->{'cancelled_at'}) {
1038         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1039              " by user " . $Job->{cancelled_by_user_uuid});
1040         $main::success = 0;
1041         $main::please_freeze = 1;
1042       }
1043     }
1044   }
1045 }
1046
1047 sub check_squeue
1048 {
1049   # return if the kill list was checked <4 seconds ago
1050   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1051   {
1052     return;
1053   }
1054   $squeue_kill_checked = time;
1055
1056   # use killem() on procs whose killtime is reached
1057   for (keys %proc)
1058   {
1059     if (exists $proc{$_}->{killtime}
1060         && $proc{$_}->{killtime} <= time)
1061     {
1062       killem ($_);
1063     }
1064   }
1065
1066   # return if the squeue was checked <60 seconds ago
1067   if (defined $squeue_checked && $squeue_checked > time - 60)
1068   {
1069     return;
1070   }
1071   $squeue_checked = time;
1072
1073   if (!$have_slurm)
1074   {
1075     # here is an opportunity to check for mysterious problems with local procs
1076     return;
1077   }
1078
1079   # get a list of steps still running
1080   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1081   chop @squeue;
1082   if ($squeue[-1] ne "ok")
1083   {
1084     return;
1085   }
1086   pop @squeue;
1087
1088   # which of my jobsteps are running, according to squeue?
1089   my %ok;
1090   foreach (@squeue)
1091   {
1092     if (/^(\d+)\.(\d+) (\S+)/)
1093     {
1094       if ($1 eq $ENV{SLURM_JOBID})
1095       {
1096         $ok{$3} = 1;
1097       }
1098     }
1099   }
1100
1101   # which of my active child procs (>60s old) were not mentioned by squeue?
1102   foreach (keys %proc)
1103   {
1104     if ($proc{$_}->{time} < time - 60
1105         && !exists $ok{$proc{$_}->{jobstepname}}
1106         && !exists $proc{$_}->{killtime})
1107     {
1108       # kill this proc if it hasn't exited in 30 seconds
1109       $proc{$_}->{killtime} = time + 30;
1110     }
1111   }
1112 }
1113
1114
1115 sub release_allocation
1116 {
1117   if ($have_slurm)
1118   {
1119     Log (undef, "release job allocation");
1120     system "scancel $ENV{SLURM_JOBID}";
1121   }
1122 }
1123
1124
1125 sub readfrompipes
1126 {
1127   my $gotsome = 0;
1128   foreach my $job (keys %reader)
1129   {
1130     my $buf;
1131     while (0 < sysread ($reader{$job}, $buf, 8192))
1132     {
1133       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1134       $jobstep[$job]->{stderr} .= $buf;
1135       preprocess_stderr ($job);
1136       if (length ($jobstep[$job]->{stderr}) > 16384)
1137       {
1138         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1139       }
1140       $gotsome = 1;
1141     }
1142   }
1143   return $gotsome;
1144 }
1145
1146
1147 sub preprocess_stderr
1148 {
1149   my $job = shift;
1150
1151   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1152     my $line = $1;
1153     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1154     Log ($job, "stderr $line");
1155     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1156       # whoa.
1157       $main::please_freeze = 1;
1158     }
1159     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1160       $jobstep[$job]->{node_fail} = 1;
1161       ban_node_by_slot($jobstep[$job]->{slotindex});
1162     }
1163   }
1164 }
1165
1166
1167 sub process_stderr
1168 {
1169   my $job = shift;
1170   my $task_success = shift;
1171   preprocess_stderr ($job);
1172
1173   map {
1174     Log ($job, "stderr $_");
1175   } split ("\n", $jobstep[$job]->{stderr});
1176 }
1177
1178 sub fetch_block
1179 {
1180   my $hash = shift;
1181   my ($keep, $child_out, $output_block);
1182
1183   my $cmd = "$arv_cli keep get \Q$hash\E";
1184   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1185   sysread($keep, $output_block, 64 * 1024 * 1024);
1186   close $keep;
1187   return $output_block;
1188 }
1189
1190 sub collate_output
1191 {
1192   Log (undef, "collate");
1193
1194   my ($child_out, $child_in);
1195   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1196   my $joboutput;
1197   for (@jobstep)
1198   {
1199     next if (!exists $_->{'arvados_task'}->{output} ||
1200              !$_->{'arvados_task'}->{'success'} ||
1201              $_->{'exitcode'} != 0);
1202     my $output = $_->{'arvados_task'}->{output};
1203     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1204     {
1205       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1206       print $child_in $output;
1207     }
1208     elsif (@jobstep == 1)
1209     {
1210       $joboutput = $output;
1211       last;
1212     }
1213     elsif (defined (my $outblock = fetch_block ($output)))
1214     {
1215       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1216       print $child_in $outblock;
1217     }
1218     else
1219     {
1220       Log (undef, "XXX fetch_block($output) failed XXX");
1221       $main::success = 0;
1222     }
1223   }
1224   $child_in->close;
1225
1226   if (!defined $joboutput) {
1227     my $s = IO::Select->new($child_out);
1228     if ($s->can_read(120)) {
1229       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1230       chomp($joboutput);
1231     } else {
1232       Log (undef, "timed out reading from 'arv keep put'");
1233     }
1234   }
1235   waitpid($pid, 0);
1236
1237   if ($joboutput)
1238   {
1239     Log (undef, "output $joboutput");
1240     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1241   }
1242   else
1243   {
1244     Log (undef, "output undef");
1245   }
1246   return $joboutput;
1247 }
1248
1249
1250 sub killem
1251 {
1252   foreach (@_)
1253   {
1254     my $sig = 2;                # SIGINT first
1255     if (exists $proc{$_}->{"sent_$sig"} &&
1256         time - $proc{$_}->{"sent_$sig"} > 4)
1257     {
1258       $sig = 15;                # SIGTERM if SIGINT doesn't work
1259     }
1260     if (exists $proc{$_}->{"sent_$sig"} &&
1261         time - $proc{$_}->{"sent_$sig"} > 4)
1262     {
1263       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1264     }
1265     if (!exists $proc{$_}->{"sent_$sig"})
1266     {
1267       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1268       kill $sig, $_;
1269       select (undef, undef, undef, 0.1);
1270       if ($sig == 2)
1271       {
1272         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1273       }
1274       $proc{$_}->{"sent_$sig"} = time;
1275       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1276     }
1277   }
1278 }
1279
1280
1281 sub fhbits
1282 {
1283   my($bits);
1284   for (@_) {
1285     vec($bits,fileno($_),1) = 1;
1286   }
1287   $bits;
1288 }
1289
1290
1291 sub Log                         # ($jobstep_id, $logmessage)
1292 {
1293   if ($_[1] =~ /\n/) {
1294     for my $line (split (/\n/, $_[1])) {
1295       Log ($_[0], $line);
1296     }
1297     return;
1298   }
1299   my $fh = select STDERR; $|=1; select $fh;
1300   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1301   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1302   $message .= "\n";
1303   my $datetime;
1304   if ($local_logfile || -t STDERR) {
1305     my @gmtime = gmtime;
1306     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1307                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1308   }
1309   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1310
1311   if ($local_logfile) {
1312     print $local_logfile $datetime . " " . $message;
1313   }
1314 }
1315
1316
1317 sub croak
1318 {
1319   my ($package, $file, $line) = caller;
1320   my $message = "@_ at $file line $line\n";
1321   Log (undef, $message);
1322   freeze() if @jobstep_todo;
1323   collate_output() if @jobstep_todo;
1324   cleanup();
1325   save_meta() if $local_logfile;
1326   die;
1327 }
1328
1329
1330 sub cleanup
1331 {
1332   return if !$job_has_uuid;
1333   $Job->update_attributes('running' => 0,
1334                           'success' => 0,
1335                           'finished_at' => scalar gmtime);
1336 }
1337
1338
1339 sub save_meta
1340 {
1341   my $justcheckpoint = shift; # false if this will be the last meta saved
1342   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1343
1344   $local_logfile->flush;
1345   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1346       . quotemeta($local_logfile->filename);
1347   my $loglocator = `$cmd`;
1348   die "system $cmd failed: $?" if $?;
1349   chomp($loglocator);
1350
1351   $local_logfile = undef;   # the temp file is automatically deleted
1352   Log (undef, "log manifest is $loglocator");
1353   $Job->{'log'} = $loglocator;
1354   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1355 }
1356
1357
1358 sub freeze_if_want_freeze
1359 {
1360   if ($main::please_freeze)
1361   {
1362     release_allocation();
1363     if (@_)
1364     {
1365       # kill some srun procs before freeze+stop
1366       map { $proc{$_} = {} } @_;
1367       while (%proc)
1368       {
1369         killem (keys %proc);
1370         select (undef, undef, undef, 0.1);
1371         my $died;
1372         while (($died = waitpid (-1, WNOHANG)) > 0)
1373         {
1374           delete $proc{$died};
1375         }
1376       }
1377     }
1378     freeze();
1379     collate_output();
1380     cleanup();
1381     save_meta();
1382     exit 0;
1383   }
1384 }
1385
1386
1387 sub freeze
1388 {
1389   Log (undef, "Freeze not implemented");
1390   return;
1391 }
1392
1393
1394 sub thaw
1395 {
1396   croak ("Thaw not implemented");
1397 }
1398
1399
1400 sub freezequote
1401 {
1402   my $s = shift;
1403   $s =~ s/\\/\\\\/g;
1404   $s =~ s/\n/\\n/g;
1405   return $s;
1406 }
1407
1408
1409 sub freezeunquote
1410 {
1411   my $s = shift;
1412   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1413   return $s;
1414 }
1415
1416
1417 sub srun
1418 {
1419   my $srunargs = shift;
1420   my $execargs = shift;
1421   my $opts = shift || {};
1422   my $stdin = shift;
1423   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1424   print STDERR (join (" ",
1425                       map { / / ? "'$_'" : $_ }
1426                       (@$args)),
1427                 "\n")
1428       if $ENV{CRUNCH_DEBUG};
1429
1430   if (defined $stdin) {
1431     my $child = open STDIN, "-|";
1432     defined $child or die "no fork: $!";
1433     if ($child == 0) {
1434       print $stdin or die $!;
1435       close STDOUT or die $!;
1436       exit 0;
1437     }
1438   }
1439
1440   return system (@$args) if $opts->{fork};
1441
1442   exec @$args;
1443   warn "ENV size is ".length(join(" ",%ENV));
1444   die "exec failed: $!: @$args";
1445 }
1446
1447
1448 sub ban_node_by_slot {
1449   # Don't start any new jobsteps on this node for 60 seconds
1450   my $slotid = shift;
1451   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1452   $slot[$slotid]->{node}->{hold_count}++;
1453   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1454 }
1455
1456 sub must_lock_now
1457 {
1458   my ($lockfile, $error_message) = @_;
1459   open L, ">", $lockfile or croak("$lockfile: $!");
1460   if (!flock L, LOCK_EX|LOCK_NB) {
1461     croak("Can't lock $lockfile: $error_message\n");
1462   }
1463 }
1464
1465 sub find_docker_hash {
1466   # Given a Keep locator, search for a matching link to find the Docker hash
1467   # of the stored image.
1468   my $locator = shift;
1469   my $links_result = $arv->{links}->{list}->execute(
1470     filters => [["head_uuid", "=", $locator],
1471                 ["link_class", "=", "docker_image_hash"]],
1472     limit => 1);
1473   my $docker_hash;
1474   foreach my $link (@{$links_result->{items}}) {
1475     $docker_hash = lc($link->{name});
1476   }
1477   return $docker_hash;
1478 }
1479
1480 __DATA__
1481 #!/usr/bin/perl
1482
1483 # checkout-and-build
1484
1485 use Fcntl ':flock';
1486
1487 my $destdir = $ENV{"CRUNCH_SRC"};
1488 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1489 my $repo = $ENV{"CRUNCH_SRC_URL"};
1490
1491 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1492 flock L, LOCK_EX;
1493 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1494     exit 0;
1495 }
1496
1497 unlink "$destdir.commit";
1498 open STDOUT, ">", "$destdir.log";
1499 open STDERR, ">&STDOUT";
1500
1501 mkdir $destdir;
1502 my @git_archive_data = <DATA>;
1503 if (@git_archive_data) {
1504   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1505   print TARX @git_archive_data;
1506   if(!close(TARX)) {
1507     die "'tar -C $destdir -xf -' exited $?: $!";
1508   }
1509 }
1510
1511 my $pwd;
1512 chomp ($pwd = `pwd`);
1513 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1514 mkdir $install_dir;
1515
1516 for my $src_path ("$destdir/arvados/sdk/python") {
1517   if (-d $src_path) {
1518     shell_or_die ("virtualenv", $install_dir);
1519     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1520   }
1521 }
1522
1523 if (-e "$destdir/crunch_scripts/install") {
1524     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1525 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1526     # Old version
1527     shell_or_die ("./tests/autotests.sh", $install_dir);
1528 } elsif (-e "./install.sh") {
1529     shell_or_die ("./install.sh", $install_dir);
1530 }
1531
1532 if ($commit) {
1533     unlink "$destdir.commit.new";
1534     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1535     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1536 }
1537
1538 close L;
1539
1540 exit 0;
1541
1542 sub shell_or_die
1543 {
1544   if ($ENV{"DEBUG"}) {
1545     print STDERR "@_\n";
1546   }
1547   system (@_) == 0
1548       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1549 }
1550
1551 __DATA__