Merge remote-tracking branch 'origin/master' into job-docker-images
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506   $docker_hash = find_docker_hash($docker_locator);
507   if (!$docker_hash)
508   {
509     croak("No Docker image hash found from locator $docker_locator");
510   }
511   my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
514 fi
515 };
516   my $docker_pid = fork();
517   if ($docker_pid == 0)
518   {
519     srun (["srun", "--nodelist=" . join(',', @node)],
520           ["/bin/sh", "-ec", $docker_install_script]);
521     exit ($?);
522   }
523   while (1)
524   {
525     last if $docker_pid == waitpid (-1, WNOHANG);
526     freeze_if_want_freeze ($docker_pid);
527     select (undef, undef, undef, 0.1);
528   }
529   if ($? != 0)
530   {
531     croak("Installing Docker image from $docker_locator returned exit code $?");
532   }
533 }
534
535 foreach (qw (script script_version script_parameters runtime_constraints))
536 {
537   Log (undef,
538        "$_ " .
539        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
540 }
541 foreach (split (/\n/, $Job->{knobs}))
542 {
543   Log (undef, "knob " . $_);
544 }
545
546
547
548 $main::success = undef;
549
550
551
552 ONELEVEL:
553
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
557
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559                        or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
562
563
564
565 my %proc;
566 my @freeslot = (0..$#slot);
567 my @holdslot;
568 my %reader;
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
571
572 update_progress_stats();
573
574
575
576 THISROUND:
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
578 {
579   my $id = $jobstep_todo[$todo_ptr];
580   my $Jobstep = $jobstep[$id];
581   if ($Jobstep->{level} != $level)
582   {
583     next;
584   }
585
586   pipe $reader{$id}, "writer" or croak ($!);
587   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
589
590   my $childslot = $freeslot[0];
591   my $childnode = $slot[$childslot]->{node};
592   my $childslotname = join (".",
593                             $slot[$childslot]->{node}->{name},
594                             $slot[$childslot]->{cpu});
595   my $childpid = fork();
596   if ($childpid == 0)
597   {
598     $SIG{'INT'} = 'DEFAULT';
599     $SIG{'QUIT'} = 'DEFAULT';
600     $SIG{'TERM'} = 'DEFAULT';
601
602     foreach (values (%reader))
603     {
604       close($_);
605     }
606     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607     open(STDOUT,">&writer");
608     open(STDERR,">&writer");
609
610     undef $dbh;
611     undef $sth;
612
613     delete $ENV{"GNUPGHOME"};
614     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615     $ENV{"TASK_QSEQUENCE"} = $id;
616     $ENV{"TASK_SEQUENCE"} = $level;
617     $ENV{"JOB_SCRIPT"} = $Job->{script};
618     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619       $param =~ tr/a-z/A-Z/;
620       $ENV{"JOB_PARAMETER_$param"} = $value;
621     }
622     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625     $ENV{"HOME"} = $ENV{"TASK_WORK"};
626     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
627     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
628     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
629     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
630
631     $ENV{"GZIP"} = "-n";
632
633     my @srunargs = (
634       "srun",
635       "--nodelist=".$childnode->{name},
636       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
637       "--job-name=$job_id.$id.$$",
638         );
639     my $build_script_to_send = "";
640     my $command =
641         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
642         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
643         ."&& cd $ENV{CRUNCH_TMP} ";
644     if ($build_script)
645     {
646       $build_script_to_send = $build_script;
647       $command .=
648           "&& perl -";
649     }
650     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
651     if ($docker_hash)
652     {
653       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
654       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
655       # Dynamically configure the container to use the host system as its
656       # DNS server.  Get the host's global addresses from the ip command,
657       # and turn them into docker --dns options using gawk.
658       $command .=
659           q{$(ip -o address show scope global |
660               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
661       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
662       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
663       $command .= "--env=\QHOME=/home/crunch\E ";
664       while (my ($env_key, $env_val) = each %ENV)
665       {
666         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
667           if ($env_key eq "TASK_WORK") {
668             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
669           }
670           elsif ($env_key eq "TASK_KEEPMOUNT") {
671             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
672           }
673           elsif ($env_key eq "CRUNCH_SRC") {
674             $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
675           }
676           else {
677             $command .= "--env=\Q$env_key=$env_val\E ";
678           }
679         }
680       }
681       $command .= "\Q$docker_hash\E ";
682       $command .= "stdbuf --output=0 --error=0 ";
683       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
684     } else {
685       # Non-docker run
686       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
687       $command .= "stdbuf --output=0 --error=0 ";
688       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
689     }
690
691     my @execargs = ('bash', '-c', $command);
692     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
693     exit (111);
694   }
695   close("writer");
696   if (!defined $childpid)
697   {
698     close $reader{$id};
699     delete $reader{$id};
700     next;
701   }
702   shift @freeslot;
703   $proc{$childpid} = { jobstep => $id,
704                        time => time,
705                        slot => $childslot,
706                        jobstepname => "$job_id.$id.$childpid",
707                      };
708   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
709   $slot[$childslot]->{pid} = $childpid;
710
711   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
712   Log ($id, "child $childpid started on $childslotname");
713   $Jobstep->{starttime} = time;
714   $Jobstep->{node} = $childnode->{name};
715   $Jobstep->{slotindex} = $childslot;
716   delete $Jobstep->{stderr};
717   delete $Jobstep->{finishtime};
718
719   splice @jobstep_todo, $todo_ptr, 1;
720   --$todo_ptr;
721
722   $progress_is_dirty = 1;
723
724   while (!@freeslot
725          ||
726          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
727   {
728     last THISROUND if $main::please_freeze;
729     if ($main::please_info)
730     {
731       $main::please_info = 0;
732       freeze();
733       collate_output();
734       save_meta(1);
735       update_progress_stats();
736     }
737     my $gotsome
738         = readfrompipes ()
739         + reapchildren ();
740     if (!$gotsome)
741     {
742       check_refresh_wanted();
743       check_squeue();
744       update_progress_stats();
745       select (undef, undef, undef, 0.1);
746     }
747     elsif (time - $progress_stats_updated >= 30)
748     {
749       update_progress_stats();
750     }
751     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
752         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
753     {
754       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
755           .($thisround_failed+$thisround_succeeded)
756           .") -- giving up on this round";
757       Log (undef, $message);
758       last THISROUND;
759     }
760
761     # move slots from freeslot to holdslot (or back to freeslot) if necessary
762     for (my $i=$#freeslot; $i>=0; $i--) {
763       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
764         push @holdslot, (splice @freeslot, $i, 1);
765       }
766     }
767     for (my $i=$#holdslot; $i>=0; $i--) {
768       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
769         push @freeslot, (splice @holdslot, $i, 1);
770       }
771     }
772
773     # give up if no nodes are succeeding
774     if (!grep { $_->{node}->{losing_streak} == 0 &&
775                     $_->{node}->{hold_count} < 4 } @slot) {
776       my $message = "Every node has failed -- giving up on this round";
777       Log (undef, $message);
778       last THISROUND;
779     }
780   }
781 }
782
783
784 push @freeslot, splice @holdslot;
785 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
786
787
788 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
789 while (%proc)
790 {
791   if ($main::please_continue) {
792     $main::please_continue = 0;
793     goto THISROUND;
794   }
795   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
796   readfrompipes ();
797   if (!reapchildren())
798   {
799     check_refresh_wanted();
800     check_squeue();
801     update_progress_stats();
802     select (undef, undef, undef, 0.1);
803     killem (keys %proc) if $main::please_freeze;
804   }
805 }
806
807 update_progress_stats();
808 freeze_if_want_freeze();
809
810
811 if (!defined $main::success)
812 {
813   if (@jobstep_todo &&
814       $thisround_succeeded == 0 &&
815       ($thisround_failed == 0 || $thisround_failed > 4))
816   {
817     my $message = "stop because $thisround_failed tasks failed and none succeeded";
818     Log (undef, $message);
819     $main::success = 0;
820   }
821   if (!@jobstep_todo)
822   {
823     $main::success = 1;
824   }
825 }
826
827 goto ONELEVEL if !defined $main::success;
828
829
830 release_allocation();
831 freeze();
832 my $collated_output = &collate_output();
833
834 if ($job_has_uuid) {
835   $Job->update_attributes('running' => 0,
836                           'success' => $collated_output && $main::success,
837                           'finished_at' => scalar gmtime)
838 }
839
840 if ($collated_output)
841 {
842   eval {
843     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
844         or die "failed to get collated manifest: $!";
845     # Read the original manifest, and strip permission hints from it,
846     # so we can put the result in a Collection.
847     my @stripped_manifest_lines = ();
848     my $orig_manifest_text = '';
849     while (my $manifest_line = <$orig_manifest>) {
850       $orig_manifest_text .= $manifest_line;
851       my @words = split(/ /, $manifest_line, -1);
852       foreach my $ii (0..$#words) {
853         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
854           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
855         }
856       }
857       push(@stripped_manifest_lines, join(" ", @words));
858     }
859     my $stripped_manifest_text = join("", @stripped_manifest_lines);
860     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
861       'uuid' => md5_hex($stripped_manifest_text),
862       'manifest_text' => $orig_manifest_text,
863     });
864     $Job->update_attributes('output' => $output->{uuid});
865     if ($Job->{'output_is_persistent'}) {
866       $arv->{'links'}->{'create'}->execute('link' => {
867         'tail_kind' => 'arvados#user',
868         'tail_uuid' => $User->{'uuid'},
869         'head_kind' => 'arvados#collection',
870         'head_uuid' => $Job->{'output'},
871         'link_class' => 'resources',
872         'name' => 'wants',
873       });
874     }
875   };
876   if ($@) {
877     Log (undef, "Failed to register output manifest: $@");
878   }
879 }
880
881 Log (undef, "finish");
882
883 save_meta();
884 exit 0;
885
886
887
888 sub update_progress_stats
889 {
890   $progress_stats_updated = time;
891   return if !$progress_is_dirty;
892   my ($todo, $done, $running) = (scalar @jobstep_todo,
893                                  scalar @jobstep_done,
894                                  scalar @slot - scalar @freeslot - scalar @holdslot);
895   $Job->{'tasks_summary'} ||= {};
896   $Job->{'tasks_summary'}->{'todo'} = $todo;
897   $Job->{'tasks_summary'}->{'done'} = $done;
898   $Job->{'tasks_summary'}->{'running'} = $running;
899   if ($job_has_uuid) {
900     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
901   }
902   Log (undef, "status: $done done, $running running, $todo todo");
903   $progress_is_dirty = 0;
904 }
905
906
907
908 sub reapchildren
909 {
910   my $pid = waitpid (-1, WNOHANG);
911   return 0 if $pid <= 0;
912
913   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
914                   . "."
915                   . $slot[$proc{$pid}->{slot}]->{cpu});
916   my $jobstepid = $proc{$pid}->{jobstep};
917   my $elapsed = time - $proc{$pid}->{time};
918   my $Jobstep = $jobstep[$jobstepid];
919
920   my $childstatus = $?;
921   my $exitvalue = $childstatus >> 8;
922   my $exitinfo = sprintf("exit %d signal %d%s",
923                          $exitvalue,
924                          $childstatus & 127,
925                          ($childstatus & 128 ? ' core dump' : ''));
926   $Jobstep->{'arvados_task'}->reload;
927   my $task_success = $Jobstep->{'arvados_task'}->{success};
928
929   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
930
931   if (!defined $task_success) {
932     # task did not indicate one way or the other --> fail
933     $Jobstep->{'arvados_task'}->{success} = 0;
934     $Jobstep->{'arvados_task'}->save;
935     $task_success = 0;
936   }
937
938   if (!$task_success)
939   {
940     my $temporary_fail;
941     $temporary_fail ||= $Jobstep->{node_fail};
942     $temporary_fail ||= ($exitvalue == 111);
943
944     ++$thisround_failed;
945     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
946
947     # Check for signs of a failed or misconfigured node
948     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
949         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
950       # Don't count this against jobstep failure thresholds if this
951       # node is already suspected faulty and srun exited quickly
952       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
953           $elapsed < 5) {
954         Log ($jobstepid, "blaming failure on suspect node " .
955              $slot[$proc{$pid}->{slot}]->{node}->{name});
956         $temporary_fail ||= 1;
957       }
958       ban_node_by_slot($proc{$pid}->{slot});
959     }
960
961     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
962                              ++$Jobstep->{'failures'},
963                              $temporary_fail ? 'temporary ' : 'permanent',
964                              $elapsed));
965
966     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
967       # Give up on this task, and the whole job
968       $main::success = 0;
969       $main::please_freeze = 1;
970     }
971     else {
972       # Put this task back on the todo queue
973       push @jobstep_todo, $jobstepid;
974     }
975     $Job->{'tasks_summary'}->{'failed'}++;
976   }
977   else
978   {
979     ++$thisround_succeeded;
980     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
981     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
982     push @jobstep_done, $jobstepid;
983     Log ($jobstepid, "success in $elapsed seconds");
984   }
985   $Jobstep->{exitcode} = $childstatus;
986   $Jobstep->{finishtime} = time;
987   process_stderr ($jobstepid, $task_success);
988   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
989
990   close $reader{$jobstepid};
991   delete $reader{$jobstepid};
992   delete $slot[$proc{$pid}->{slot}]->{pid};
993   push @freeslot, $proc{$pid}->{slot};
994   delete $proc{$pid};
995
996   # Load new tasks
997   my $newtask_list = [];
998   my $newtask_results;
999   do {
1000     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1001       'where' => {
1002         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1003       },
1004       'order' => 'qsequence',
1005       'offset' => scalar(@$newtask_list),
1006     );
1007     push(@$newtask_list, @{$newtask_results->{items}});
1008   } while (@{$newtask_results->{items}});
1009   foreach my $arvados_task (@$newtask_list) {
1010     my $jobstep = {
1011       'level' => $arvados_task->{'sequence'},
1012       'failures' => 0,
1013       'arvados_task' => $arvados_task
1014     };
1015     push @jobstep, $jobstep;
1016     push @jobstep_todo, $#jobstep;
1017   }
1018
1019   $progress_is_dirty = 1;
1020   1;
1021 }
1022
1023 sub check_refresh_wanted
1024 {
1025   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1026   if (@stat && $stat[9] > $latest_refresh) {
1027     $latest_refresh = scalar time;
1028     if ($job_has_uuid) {
1029       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1030       for my $attr ('cancelled_at',
1031                     'cancelled_by_user_uuid',
1032                     'cancelled_by_client_uuid') {
1033         $Job->{$attr} = $Job2->{$attr};
1034       }
1035       if ($Job->{'cancelled_at'}) {
1036         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1037              " by user " . $Job->{cancelled_by_user_uuid});
1038         $main::success = 0;
1039         $main::please_freeze = 1;
1040       }
1041     }
1042   }
1043 }
1044
1045 sub check_squeue
1046 {
1047   # return if the kill list was checked <4 seconds ago
1048   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1049   {
1050     return;
1051   }
1052   $squeue_kill_checked = time;
1053
1054   # use killem() on procs whose killtime is reached
1055   for (keys %proc)
1056   {
1057     if (exists $proc{$_}->{killtime}
1058         && $proc{$_}->{killtime} <= time)
1059     {
1060       killem ($_);
1061     }
1062   }
1063
1064   # return if the squeue was checked <60 seconds ago
1065   if (defined $squeue_checked && $squeue_checked > time - 60)
1066   {
1067     return;
1068   }
1069   $squeue_checked = time;
1070
1071   if (!$have_slurm)
1072   {
1073     # here is an opportunity to check for mysterious problems with local procs
1074     return;
1075   }
1076
1077   # get a list of steps still running
1078   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1079   chop @squeue;
1080   if ($squeue[-1] ne "ok")
1081   {
1082     return;
1083   }
1084   pop @squeue;
1085
1086   # which of my jobsteps are running, according to squeue?
1087   my %ok;
1088   foreach (@squeue)
1089   {
1090     if (/^(\d+)\.(\d+) (\S+)/)
1091     {
1092       if ($1 eq $ENV{SLURM_JOBID})
1093       {
1094         $ok{$3} = 1;
1095       }
1096     }
1097   }
1098
1099   # which of my active child procs (>60s old) were not mentioned by squeue?
1100   foreach (keys %proc)
1101   {
1102     if ($proc{$_}->{time} < time - 60
1103         && !exists $ok{$proc{$_}->{jobstepname}}
1104         && !exists $proc{$_}->{killtime})
1105     {
1106       # kill this proc if it hasn't exited in 30 seconds
1107       $proc{$_}->{killtime} = time + 30;
1108     }
1109   }
1110 }
1111
1112
1113 sub release_allocation
1114 {
1115   if ($have_slurm)
1116   {
1117     Log (undef, "release job allocation");
1118     system "scancel $ENV{SLURM_JOBID}";
1119   }
1120 }
1121
1122
1123 sub readfrompipes
1124 {
1125   my $gotsome = 0;
1126   foreach my $job (keys %reader)
1127   {
1128     my $buf;
1129     while (0 < sysread ($reader{$job}, $buf, 8192))
1130     {
1131       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1132       $jobstep[$job]->{stderr} .= $buf;
1133       preprocess_stderr ($job);
1134       if (length ($jobstep[$job]->{stderr}) > 16384)
1135       {
1136         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1137       }
1138       $gotsome = 1;
1139     }
1140   }
1141   return $gotsome;
1142 }
1143
1144
1145 sub preprocess_stderr
1146 {
1147   my $job = shift;
1148
1149   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1150     my $line = $1;
1151     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1152     Log ($job, "stderr $line");
1153     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1154       # whoa.
1155       $main::please_freeze = 1;
1156     }
1157     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1158       $jobstep[$job]->{node_fail} = 1;
1159       ban_node_by_slot($jobstep[$job]->{slotindex});
1160     }
1161   }
1162 }
1163
1164
1165 sub process_stderr
1166 {
1167   my $job = shift;
1168   my $task_success = shift;
1169   preprocess_stderr ($job);
1170
1171   map {
1172     Log ($job, "stderr $_");
1173   } split ("\n", $jobstep[$job]->{stderr});
1174 }
1175
1176 sub fetch_block
1177 {
1178   my $hash = shift;
1179   my ($keep, $child_out, $output_block);
1180
1181   my $cmd = "$arv_cli keep get \Q$hash\E";
1182   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1183   sysread($keep, $output_block, 64 * 1024 * 1024);
1184   close $keep;
1185   return $output_block;
1186 }
1187
1188 sub collate_output
1189 {
1190   Log (undef, "collate");
1191
1192   my ($child_out, $child_in);
1193   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1194   my $joboutput;
1195   for (@jobstep)
1196   {
1197     next if (!exists $_->{'arvados_task'}->{output} ||
1198              !$_->{'arvados_task'}->{'success'} ||
1199              $_->{'exitcode'} != 0);
1200     my $output = $_->{'arvados_task'}->{output};
1201     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1202     {
1203       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1204       print $child_in $output;
1205     }
1206     elsif (@jobstep == 1)
1207     {
1208       $joboutput = $output;
1209       last;
1210     }
1211     elsif (defined (my $outblock = fetch_block ($output)))
1212     {
1213       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1214       print $child_in $outblock;
1215     }
1216     else
1217     {
1218       Log (undef, "XXX fetch_block($output) failed XXX");
1219       $main::success = 0;
1220     }
1221   }
1222   $child_in->close;
1223
1224   if (!defined $joboutput) {
1225     my $s = IO::Select->new($child_out);
1226     if ($s->can_read(120)) {
1227       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1228       chomp($joboutput);
1229     } else {
1230       Log (undef, "timed out reading from 'arv keep put'");
1231     }
1232   }
1233   waitpid($pid, 0);
1234
1235   if ($joboutput)
1236   {
1237     Log (undef, "output $joboutput");
1238     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1239   }
1240   else
1241   {
1242     Log (undef, "output undef");
1243   }
1244   return $joboutput;
1245 }
1246
1247
1248 sub killem
1249 {
1250   foreach (@_)
1251   {
1252     my $sig = 2;                # SIGINT first
1253     if (exists $proc{$_}->{"sent_$sig"} &&
1254         time - $proc{$_}->{"sent_$sig"} > 4)
1255     {
1256       $sig = 15;                # SIGTERM if SIGINT doesn't work
1257     }
1258     if (exists $proc{$_}->{"sent_$sig"} &&
1259         time - $proc{$_}->{"sent_$sig"} > 4)
1260     {
1261       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1262     }
1263     if (!exists $proc{$_}->{"sent_$sig"})
1264     {
1265       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1266       kill $sig, $_;
1267       select (undef, undef, undef, 0.1);
1268       if ($sig == 2)
1269       {
1270         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1271       }
1272       $proc{$_}->{"sent_$sig"} = time;
1273       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1274     }
1275   }
1276 }
1277
1278
1279 sub fhbits
1280 {
1281   my($bits);
1282   for (@_) {
1283     vec($bits,fileno($_),1) = 1;
1284   }
1285   $bits;
1286 }
1287
1288
1289 sub Log                         # ($jobstep_id, $logmessage)
1290 {
1291   if ($_[1] =~ /\n/) {
1292     for my $line (split (/\n/, $_[1])) {
1293       Log ($_[0], $line);
1294     }
1295     return;
1296   }
1297   my $fh = select STDERR; $|=1; select $fh;
1298   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1299   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1300   $message .= "\n";
1301   my $datetime;
1302   if ($local_logfile || -t STDERR) {
1303     my @gmtime = gmtime;
1304     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1305                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1306   }
1307   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1308
1309   if ($local_logfile) {
1310     print $local_logfile $datetime . " " . $message;
1311   }
1312 }
1313
1314
1315 sub croak
1316 {
1317   my ($package, $file, $line) = caller;
1318   my $message = "@_ at $file line $line\n";
1319   Log (undef, $message);
1320   freeze() if @jobstep_todo;
1321   collate_output() if @jobstep_todo;
1322   cleanup();
1323   save_meta() if $local_logfile;
1324   die;
1325 }
1326
1327
1328 sub cleanup
1329 {
1330   return if !$job_has_uuid;
1331   $Job->update_attributes('running' => 0,
1332                           'success' => 0,
1333                           'finished_at' => scalar gmtime);
1334 }
1335
1336
1337 sub save_meta
1338 {
1339   my $justcheckpoint = shift; # false if this will be the last meta saved
1340   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1341
1342   $local_logfile->flush;
1343   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1344       . quotemeta($local_logfile->filename);
1345   my $loglocator = `$cmd`;
1346   die "system $cmd failed: $?" if $?;
1347   chomp($loglocator);
1348
1349   $local_logfile = undef;   # the temp file is automatically deleted
1350   Log (undef, "log manifest is $loglocator");
1351   $Job->{'log'} = $loglocator;
1352   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1353 }
1354
1355
1356 sub freeze_if_want_freeze
1357 {
1358   if ($main::please_freeze)
1359   {
1360     release_allocation();
1361     if (@_)
1362     {
1363       # kill some srun procs before freeze+stop
1364       map { $proc{$_} = {} } @_;
1365       while (%proc)
1366       {
1367         killem (keys %proc);
1368         select (undef, undef, undef, 0.1);
1369         my $died;
1370         while (($died = waitpid (-1, WNOHANG)) > 0)
1371         {
1372           delete $proc{$died};
1373         }
1374       }
1375     }
1376     freeze();
1377     collate_output();
1378     cleanup();
1379     save_meta();
1380     exit 0;
1381   }
1382 }
1383
1384
1385 sub freeze
1386 {
1387   Log (undef, "Freeze not implemented");
1388   return;
1389 }
1390
1391
1392 sub thaw
1393 {
1394   croak ("Thaw not implemented");
1395 }
1396
1397
1398 sub freezequote
1399 {
1400   my $s = shift;
1401   $s =~ s/\\/\\\\/g;
1402   $s =~ s/\n/\\n/g;
1403   return $s;
1404 }
1405
1406
1407 sub freezeunquote
1408 {
1409   my $s = shift;
1410   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1411   return $s;
1412 }
1413
1414
1415 sub srun
1416 {
1417   my $srunargs = shift;
1418   my $execargs = shift;
1419   my $opts = shift || {};
1420   my $stdin = shift;
1421   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1422   print STDERR (join (" ",
1423                       map { / / ? "'$_'" : $_ }
1424                       (@$args)),
1425                 "\n")
1426       if $ENV{CRUNCH_DEBUG};
1427
1428   if (defined $stdin) {
1429     my $child = open STDIN, "-|";
1430     defined $child or die "no fork: $!";
1431     if ($child == 0) {
1432       print $stdin or die $!;
1433       close STDOUT or die $!;
1434       exit 0;
1435     }
1436   }
1437
1438   return system (@$args) if $opts->{fork};
1439
1440   exec @$args;
1441   warn "ENV size is ".length(join(" ",%ENV));
1442   die "exec failed: $!: @$args";
1443 }
1444
1445
1446 sub ban_node_by_slot {
1447   # Don't start any new jobsteps on this node for 60 seconds
1448   my $slotid = shift;
1449   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1450   $slot[$slotid]->{node}->{hold_count}++;
1451   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1452 }
1453
1454 sub must_lock_now
1455 {
1456   my ($lockfile, $error_message) = @_;
1457   open L, ">", $lockfile or croak("$lockfile: $!");
1458   if (!flock L, LOCK_EX|LOCK_NB) {
1459     croak("Can't lock $lockfile: $error_message\n");
1460   }
1461 }
1462
1463 sub find_docker_hash {
1464   # Given a Keep locator, search for a matching link to find the Docker hash
1465   # of the stored image.
1466   my $locator = shift;
1467   my $links_result = $arv->{links}->{list}->execute(
1468     filters => [["head_uuid", "=", $locator],
1469                 ["link_class", "=", "docker_image_hash"]],
1470     limit => 1);
1471   my $docker_hash;
1472   foreach my $link (@{$links_result->{items}}) {
1473     $docker_hash = lc($link->{name});
1474   }
1475   return $docker_hash;
1476 }
1477
1478 __DATA__
1479 #!/usr/bin/perl
1480
1481 # checkout-and-build
1482
1483 use Fcntl ':flock';
1484
1485 my $destdir = $ENV{"CRUNCH_SRC"};
1486 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1487 my $repo = $ENV{"CRUNCH_SRC_URL"};
1488
1489 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1490 flock L, LOCK_EX;
1491 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1492     exit 0;
1493 }
1494
1495 unlink "$destdir.commit";
1496 open STDOUT, ">", "$destdir.log";
1497 open STDERR, ">&STDOUT";
1498
1499 mkdir $destdir;
1500 my @git_archive_data = <DATA>;
1501 if (@git_archive_data) {
1502   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1503   print TARX @git_archive_data;
1504   if(!close(TARX)) {
1505     die "'tar -C $destdir -xf -' exited $?: $!";
1506   }
1507 }
1508
1509 my $pwd;
1510 chomp ($pwd = `pwd`);
1511 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1512 mkdir $install_dir;
1513
1514 for my $src_path ("$destdir/arvados/sdk/python") {
1515   if (-d $src_path) {
1516     shell_or_die ("virtualenv", $install_dir);
1517     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1518   }
1519 }
1520
1521 if (-e "$destdir/crunch_scripts/install") {
1522     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1523 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1524     # Old version
1525     shell_or_die ("./tests/autotests.sh", $install_dir);
1526 } elsif (-e "./install.sh") {
1527     shell_or_die ("./install.sh", $install_dir);
1528 }
1529
1530 if ($commit) {
1531     unlink "$destdir.commit.new";
1532     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1533     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1534 }
1535
1536 close L;
1537
1538 exit 0;
1539
1540 sub shell_or_die
1541 {
1542   if ($ENV{"DEBUG"}) {
1543     print STDERR "@_\n";
1544   }
1545   system (@_) == 0
1546       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1547 }
1548
1549 __DATA__