Merge branch 'master' into 3661-copy-move-from-show
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
130
131
132 $SIG{'USR1'} = sub
133 {
134   $main::ENV{CRUNCH_DEBUG} = 1;
135 };
136 $SIG{'USR2'} = sub
137 {
138   $main::ENV{CRUNCH_DEBUG} = 0;
139 };
140
141
142
143 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $local_logfile;
145
146 my $User = $arv->{'users'}->{'current'}->execute;
147
148 my $Job = {};
149 my $job_id;
150 my $dbh;
151 my $sth;
152 if ($job_has_uuid)
153 {
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # If some other crunch-job process has grabbed this job (or we see
157     # other evidence that the job is already underway) we exit
158     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159     # mark the job as failed.
160     if ($Job->{'is_locked_by_uuid'}) {
161       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
162       exit EX_TEMPFAIL;
163     }
164     if ($Job->{'success'} ne undef) {
165       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
166       exit EX_TEMPFAIL;
167     }
168     if ($Job->{'running'}) {
169       Log(undef, "Job 'running' flag is already set");
170       exit EX_TEMPFAIL;
171     }
172     if ($Job->{'started_at'}) {
173       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
174       exit EX_TEMPFAIL;
175     }
176   }
177 }
178 else
179 {
180   $Job = JSON::decode_json($jobspec);
181
182   if (!$resume_stash)
183   {
184     map { croak ("No $_ specified") unless $Job->{$_} }
185     qw(script script_version script_parameters);
186   }
187
188   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
189   $Job->{'started_at'} = gmtime;
190
191   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
192
193   $job_has_uuid = 1;
194 }
195 $job_id = $Job->{'uuid'};
196
197 my $keep_logfile = $job_id . '.log.txt';
198 $local_logfile = File::Temp->new();
199
200 $Job->{'runtime_constraints'} ||= {};
201 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
202 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
203
204
205 Log (undef, "check slurm allocation");
206 my @slot;
207 my @node;
208 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
209 my @sinfo;
210 if (!$have_slurm)
211 {
212   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
213   push @sinfo, "$localcpus localhost";
214 }
215 if (exists $ENV{SLURM_NODELIST})
216 {
217   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
218 }
219 foreach (@sinfo)
220 {
221   my ($ncpus, $slurm_nodelist) = split;
222   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
223
224   my @nodelist;
225   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
226   {
227     my $nodelist = $1;
228     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
229     {
230       my $ranges = $1;
231       foreach (split (",", $ranges))
232       {
233         my ($a, $b);
234         if (/(\d+)-(\d+)/)
235         {
236           $a = $1;
237           $b = $2;
238         }
239         else
240         {
241           $a = $_;
242           $b = $_;
243         }
244         push @nodelist, map {
245           my $n = $nodelist;
246           $n =~ s/\[[-,\d]+\]/$_/;
247           $n;
248         } ($a..$b);
249       }
250     }
251     else
252     {
253       push @nodelist, $nodelist;
254     }
255   }
256   foreach my $nodename (@nodelist)
257   {
258     Log (undef, "node $nodename - $ncpus slots");
259     my $node = { name => $nodename,
260                  ncpus => $ncpus,
261                  losing_streak => 0,
262                  hold_until => 0 };
263     foreach my $cpu (1..$ncpus)
264     {
265       push @slot, { node => $node,
266                     cpu => $cpu };
267     }
268   }
269   push @node, @nodelist;
270 }
271
272
273
274 # Ensure that we get one jobstep running on each allocated node before
275 # we start overloading nodes with concurrent steps
276
277 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
278
279
280
281 my $jobmanager_id;
282 if ($job_has_uuid)
283 {
284   # Claim this job, and make sure nobody else does
285   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
288     exit EX_TEMPFAIL;
289   }
290   $Job->update_attributes('started_at' => scalar gmtime,
291                           'running' => 1,
292                           'success' => undef,
293                           'tasks_summary' => { 'failed' => 0,
294                                                'todo' => 1,
295                                                'running' => 0,
296                                                'done' => 0 });
297 }
298
299
300 Log (undef, "start");
301 $SIG{'INT'} = sub { $main::please_freeze = 1; };
302 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
303 $SIG{'TERM'} = \&croak;
304 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
305 $SIG{'ALRM'} = sub { $main::please_info = 1; };
306 $SIG{'CONT'} = sub { $main::please_continue = 1; };
307 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
308
309 $main::please_freeze = 0;
310 $main::please_info = 0;
311 $main::please_continue = 0;
312 $main::please_refresh = 0;
313 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
314
315 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
316 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
317 $ENV{"JOB_UUID"} = $job_id;
318
319
320 my @jobstep;
321 my @jobstep_todo = ();
322 my @jobstep_done = ();
323 my @jobstep_tomerge = ();
324 my $jobstep_tomerge_level = 0;
325 my $squeue_checked;
326 my $squeue_kill_checked;
327 my $output_in_keep = 0;
328 my $latest_refresh = scalar time;
329
330
331
332 if (defined $Job->{thawedfromkey})
333 {
334   thaw ($Job->{thawedfromkey});
335 }
336 else
337 {
338   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
339     'job_uuid' => $Job->{'uuid'},
340     'sequence' => 0,
341     'qsequence' => 0,
342     'parameters' => {},
343                                                           });
344   push @jobstep, { 'level' => 0,
345                    'failures' => 0,
346                    'arvados_task' => $first_task,
347                  };
348   push @jobstep_todo, 0;
349 }
350
351
352 if (!$have_slurm)
353 {
354   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
355 }
356
357
358 my $build_script;
359
360
361 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362
363 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
364 if ($skip_install)
365 {
366   if (!defined $no_clear_tmp) {
367     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
368     system($clear_tmp_cmd) == 0
369         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
370   }
371   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
372   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
373     if (-d $src_path) {
374       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
375           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
376       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
377           == 0
378           or croak ("setup.py in $src_path failed: exit ".($?>>8));
379     }
380   }
381 }
382 else
383 {
384   do {
385     local $/ = undef;
386     $build_script = <DATA>;
387   };
388   Log (undef, "Install revision ".$Job->{script_version});
389   my $nodelist = join(",", @node);
390
391   if (!defined $no_clear_tmp) {
392     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
393
394     my $cleanpid = fork();
395     if ($cleanpid == 0)
396     {
397       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
399       exit (1);
400     }
401     while (1)
402     {
403       last if $cleanpid == waitpid (-1, WNOHANG);
404       freeze_if_want_freeze ($cleanpid);
405       select (undef, undef, undef, 0.1);
406     }
407     Log (undef, "Clean-work-dir exited $?");
408   }
409
410   # Install requested code version
411
412   my @execargs;
413   my @srunargs = ("srun",
414                   "--nodelist=$nodelist",
415                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
416
417   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
418   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
419
420   my $commit;
421   my $git_archive;
422   my $treeish = $Job->{'script_version'};
423
424   # If we're running under crunch-dispatch, it will have pulled the
425   # appropriate source tree into its own repository, and given us that
426   # repo's path as $git_dir. If we're running a "local" job, and a
427   # script_version was specified, it's up to the user to provide the
428   # full path to a local repository in Job->{repository}.
429   #
430   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
431   # git-archive --remote where appropriate.
432   #
433   # TODO: Accept a locally-hosted Arvados repository by name or
434   # UUID. Use arvados.v1.repositories.list or .get to figure out the
435   # appropriate fetch-url.
436   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
437
438   $ENV{"CRUNCH_SRC_URL"} = $repo;
439
440   if (-d "$repo/.git") {
441     # We were given a working directory, but we are only interested in
442     # the index.
443     $repo = "$repo/.git";
444   }
445
446   # If this looks like a subversion r#, look for it in git-svn commit messages
447
448   if ($treeish =~ m{^\d{1,4}$}) {
449     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
450     chomp $gitlog;
451     Log(undef, "git Subversion search exited $?");
452     if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
453       $commit = $gitlog;
454       Log(undef, "Using commit $commit for Subversion revision $treeish");
455     }
456   }
457
458   # If that didn't work, try asking git to look it up as a tree-ish.
459
460   if (!defined $commit) {
461     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
462     chomp $found;
463     Log(undef, "git rev-list exited $? with result '$found'");
464     if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
465       $commit = $found;
466       Log(undef, "Using commit $commit for tree-ish $treeish");
467       if ($commit ne $treeish) {
468         # Make sure we record the real commit id in the database,
469         # frozentokey, logs, etc. -- instead of an abbreviation or a
470         # branch name which can become ambiguous or point to a
471         # different commit in the future.
472         $Job->{'script_version'} = $commit;
473         !$job_has_uuid or
474             $Job->update_attributes('script_version' => $commit) or
475             croak("Error while updating job");
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485     croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
486   }
487   else {
488     croak ("could not figure out commit id for $treeish");
489   }
490
491   # Note: this section is almost certainly unnecessary if we're
492   # running tasks in docker containers.
493   my $installpid = fork();
494   if ($installpid == 0)
495   {
496     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
497     exit (1);
498   }
499   while (1)
500   {
501     last if $installpid == waitpid (-1, WNOHANG);
502     freeze_if_want_freeze ($installpid);
503     select (undef, undef, undef, 0.1);
504   }
505   Log (undef, "Install exited $?");
506 }
507
508 if (!$have_slurm)
509 {
510   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
511   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
512 }
513
514 # If this job requires a Docker image, install that.
515 my $docker_bin = "/usr/bin/docker.io";
516 my ($docker_locator, $docker_stream, $docker_hash);
517 if ($docker_locator = $Job->{docker_image_locator}) {
518   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
519   if (!$docker_hash)
520   {
521     croak("No Docker image hash found from locator $docker_locator");
522   }
523   $docker_stream =~ s/^\.//;
524   my $docker_install_script = qq{
525 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
526     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
527 fi
528 };
529   my $docker_pid = fork();
530   if ($docker_pid == 0)
531   {
532     srun (["srun", "--nodelist=" . join(',', @node)],
533           ["/bin/sh", "-ec", $docker_install_script]);
534     exit ($?);
535   }
536   while (1)
537   {
538     last if $docker_pid == waitpid (-1, WNOHANG);
539     freeze_if_want_freeze ($docker_pid);
540     select (undef, undef, undef, 0.1);
541   }
542   if ($? != 0)
543   {
544     croak("Installing Docker image from $docker_locator returned exit code $?");
545   }
546 }
547
548 foreach (qw (script script_version script_parameters runtime_constraints))
549 {
550   Log (undef,
551        "$_ " .
552        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
553 }
554 foreach (split (/\n/, $Job->{knobs}))
555 {
556   Log (undef, "knob " . $_);
557 }
558
559
560
561 $main::success = undef;
562
563
564
565 ONELEVEL:
566
567 my $thisround_succeeded = 0;
568 my $thisround_failed = 0;
569 my $thisround_failed_multiple = 0;
570
571 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
572                        or $a <=> $b } @jobstep_todo;
573 my $level = $jobstep[$jobstep_todo[0]]->{level};
574 Log (undef, "start level $level");
575
576
577
578 my %proc;
579 my @freeslot = (0..$#slot);
580 my @holdslot;
581 my %reader;
582 my $progress_is_dirty = 1;
583 my $progress_stats_updated = 0;
584
585 update_progress_stats();
586
587
588
589 THISROUND:
590 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
591 {
592   my $id = $jobstep_todo[$todo_ptr];
593   my $Jobstep = $jobstep[$id];
594   if ($Jobstep->{level} != $level)
595   {
596     next;
597   }
598
599   pipe $reader{$id}, "writer" or croak ($!);
600   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
601   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
602
603   my $childslot = $freeslot[0];
604   my $childnode = $slot[$childslot]->{node};
605   my $childslotname = join (".",
606                             $slot[$childslot]->{node}->{name},
607                             $slot[$childslot]->{cpu});
608   my $childpid = fork();
609   if ($childpid == 0)
610   {
611     $SIG{'INT'} = 'DEFAULT';
612     $SIG{'QUIT'} = 'DEFAULT';
613     $SIG{'TERM'} = 'DEFAULT';
614
615     foreach (values (%reader))
616     {
617       close($_);
618     }
619     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
620     open(STDOUT,">&writer");
621     open(STDERR,">&writer");
622
623     undef $dbh;
624     undef $sth;
625
626     delete $ENV{"GNUPGHOME"};
627     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
628     $ENV{"TASK_QSEQUENCE"} = $id;
629     $ENV{"TASK_SEQUENCE"} = $level;
630     $ENV{"JOB_SCRIPT"} = $Job->{script};
631     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
632       $param =~ tr/a-z/A-Z/;
633       $ENV{"JOB_PARAMETER_$param"} = $value;
634     }
635     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
636     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
637     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
638     $ENV{"HOME"} = $ENV{"TASK_WORK"};
639     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
640     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
641     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
642     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
643
644     $ENV{"GZIP"} = "-n";
645
646     my @srunargs = (
647       "srun",
648       "--nodelist=".$childnode->{name},
649       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
650       "--job-name=$job_id.$id.$$",
651         );
652     my $build_script_to_send = "";
653     my $command =
654         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
655         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
656         ."&& cd $ENV{CRUNCH_TMP} ";
657     if ($build_script)
658     {
659       $build_script_to_send = $build_script;
660       $command .=
661           "&& perl -";
662     }
663     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
664     if ($docker_hash)
665     {
666       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
667       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
668       # Dynamically configure the container to use the host system as its
669       # DNS server.  Get the host's global addresses from the ip command,
670       # and turn them into docker --dns options using gawk.
671       $command .=
672           q{$(ip -o address show scope global |
673               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
674       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
675       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
676       $command .= "--env=\QHOME=/home/crunch\E ";
677       while (my ($env_key, $env_val) = each %ENV)
678       {
679         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
680           if ($env_key eq "TASK_WORK") {
681             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
682           }
683           elsif ($env_key eq "TASK_KEEPMOUNT") {
684             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
685           }
686           else {
687             $command .= "--env=\Q$env_key=$env_val\E ";
688           }
689         }
690       }
691       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
692       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
693       $command .= "\Q$docker_hash\E ";
694       $command .= "stdbuf --output=0 --error=0 ";
695       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
696     } else {
697       # Non-docker run
698       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
699       $command .= "stdbuf --output=0 --error=0 ";
700       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
701     }
702
703     my @execargs = ('bash', '-c', $command);
704     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
705     # exec() failed, we assume nothing happened.
706     Log(undef, "srun() failed on build script");
707     die;
708   }
709   close("writer");
710   if (!defined $childpid)
711   {
712     close $reader{$id};
713     delete $reader{$id};
714     next;
715   }
716   shift @freeslot;
717   $proc{$childpid} = { jobstep => $id,
718                        time => time,
719                        slot => $childslot,
720                        jobstepname => "$job_id.$id.$childpid",
721                      };
722   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
723   $slot[$childslot]->{pid} = $childpid;
724
725   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
726   Log ($id, "child $childpid started on $childslotname");
727   $Jobstep->{starttime} = time;
728   $Jobstep->{node} = $childnode->{name};
729   $Jobstep->{slotindex} = $childslot;
730   delete $Jobstep->{stderr};
731   delete $Jobstep->{finishtime};
732
733   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
734   $Jobstep->{'arvados_task'}->save;
735
736   splice @jobstep_todo, $todo_ptr, 1;
737   --$todo_ptr;
738
739   $progress_is_dirty = 1;
740
741   while (!@freeslot
742          ||
743          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
744   {
745     last THISROUND if $main::please_freeze;
746     if ($main::please_info)
747     {
748       $main::please_info = 0;
749       freeze();
750       collate_output();
751       save_meta(1);
752       update_progress_stats();
753     }
754     my $gotsome
755         = readfrompipes ()
756         + reapchildren ();
757     if (!$gotsome)
758     {
759       check_refresh_wanted();
760       check_squeue();
761       update_progress_stats();
762       select (undef, undef, undef, 0.1);
763     }
764     elsif (time - $progress_stats_updated >= 30)
765     {
766       update_progress_stats();
767     }
768     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
769         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
770     {
771       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
772           .($thisround_failed+$thisround_succeeded)
773           .") -- giving up on this round";
774       Log (undef, $message);
775       last THISROUND;
776     }
777
778     # move slots from freeslot to holdslot (or back to freeslot) if necessary
779     for (my $i=$#freeslot; $i>=0; $i--) {
780       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
781         push @holdslot, (splice @freeslot, $i, 1);
782       }
783     }
784     for (my $i=$#holdslot; $i>=0; $i--) {
785       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
786         push @freeslot, (splice @holdslot, $i, 1);
787       }
788     }
789
790     # give up if no nodes are succeeding
791     if (!grep { $_->{node}->{losing_streak} == 0 &&
792                     $_->{node}->{hold_count} < 4 } @slot) {
793       my $message = "Every node has failed -- giving up on this round";
794       Log (undef, $message);
795       last THISROUND;
796     }
797   }
798 }
799
800
801 push @freeslot, splice @holdslot;
802 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
803
804
805 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
806 while (%proc)
807 {
808   if ($main::please_continue) {
809     $main::please_continue = 0;
810     goto THISROUND;
811   }
812   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
813   readfrompipes ();
814   if (!reapchildren())
815   {
816     check_refresh_wanted();
817     check_squeue();
818     update_progress_stats();
819     select (undef, undef, undef, 0.1);
820     killem (keys %proc) if $main::please_freeze;
821   }
822 }
823
824 update_progress_stats();
825 freeze_if_want_freeze();
826
827
828 if (!defined $main::success)
829 {
830   if (@jobstep_todo &&
831       $thisround_succeeded == 0 &&
832       ($thisround_failed == 0 || $thisround_failed > 4))
833   {
834     my $message = "stop because $thisround_failed tasks failed and none succeeded";
835     Log (undef, $message);
836     $main::success = 0;
837   }
838   if (!@jobstep_todo)
839   {
840     $main::success = 1;
841   }
842 }
843
844 goto ONELEVEL if !defined $main::success;
845
846
847 release_allocation();
848 freeze();
849 my $collated_output = &collate_output();
850
851 if (!$collated_output) {
852   Log(undef, "output undef");
853 }
854 else {
855   eval {
856     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
857         or die "failed to get collated manifest: $!";
858     my $orig_manifest_text = '';
859     while (my $manifest_line = <$orig_manifest>) {
860       $orig_manifest_text .= $manifest_line;
861     }
862     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
863       'manifest_text' => $orig_manifest_text,
864     });
865     Log(undef, "output uuid " . $output->{uuid});
866     Log(undef, "output hash " . $output->{portable_data_hash});
867     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
868   };
869   if ($@) {
870     Log (undef, "Failed to register output manifest: $@");
871   }
872 }
873
874 Log (undef, "finish");
875
876 save_meta();
877
878 if ($job_has_uuid) {
879   $Job->update_attributes('running' => 0,
880                           'success' => $collated_output && $main::success,
881                           'finished_at' => scalar gmtime)
882 }
883
884 exit ($Job->{'success'} ? 1 : 0);
885
886
887
888 sub update_progress_stats
889 {
890   $progress_stats_updated = time;
891   return if !$progress_is_dirty;
892   my ($todo, $done, $running) = (scalar @jobstep_todo,
893                                  scalar @jobstep_done,
894                                  scalar @slot - scalar @freeslot - scalar @holdslot);
895   $Job->{'tasks_summary'} ||= {};
896   $Job->{'tasks_summary'}->{'todo'} = $todo;
897   $Job->{'tasks_summary'}->{'done'} = $done;
898   $Job->{'tasks_summary'}->{'running'} = $running;
899   if ($job_has_uuid) {
900     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
901   }
902   Log (undef, "status: $done done, $running running, $todo todo");
903   $progress_is_dirty = 0;
904 }
905
906
907
908 sub reapchildren
909 {
910   my $pid = waitpid (-1, WNOHANG);
911   return 0 if $pid <= 0;
912
913   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
914                   . "."
915                   . $slot[$proc{$pid}->{slot}]->{cpu});
916   my $jobstepid = $proc{$pid}->{jobstep};
917   my $elapsed = time - $proc{$pid}->{time};
918   my $Jobstep = $jobstep[$jobstepid];
919
920   my $childstatus = $?;
921   my $exitvalue = $childstatus >> 8;
922   my $exitinfo = sprintf("exit %d signal %d%s",
923                          $exitvalue,
924                          $childstatus & 127,
925                          ($childstatus & 128 ? ' core dump' : ''));
926   $Jobstep->{'arvados_task'}->reload;
927   my $task_success = $Jobstep->{'arvados_task'}->{success};
928
929   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
930
931   if (!defined $task_success) {
932     # task did not indicate one way or the other --> fail
933     $Jobstep->{'arvados_task'}->{success} = 0;
934     $Jobstep->{'arvados_task'}->save;
935     $task_success = 0;
936   }
937
938   if (!$task_success)
939   {
940     my $temporary_fail;
941     $temporary_fail ||= $Jobstep->{node_fail};
942     $temporary_fail ||= ($exitvalue == 111);
943
944     ++$thisround_failed;
945     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
946
947     # Check for signs of a failed or misconfigured node
948     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
949         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
950       # Don't count this against jobstep failure thresholds if this
951       # node is already suspected faulty and srun exited quickly
952       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
953           $elapsed < 5) {
954         Log ($jobstepid, "blaming failure on suspect node " .
955              $slot[$proc{$pid}->{slot}]->{node}->{name});
956         $temporary_fail ||= 1;
957       }
958       ban_node_by_slot($proc{$pid}->{slot});
959     }
960
961     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
962                              ++$Jobstep->{'failures'},
963                              $temporary_fail ? 'temporary ' : 'permanent',
964                              $elapsed));
965
966     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
967       # Give up on this task, and the whole job
968       $main::success = 0;
969       $main::please_freeze = 1;
970     }
971     else {
972       # Put this task back on the todo queue
973       push @jobstep_todo, $jobstepid;
974     }
975     $Job->{'tasks_summary'}->{'failed'}++;
976   }
977   else
978   {
979     ++$thisround_succeeded;
980     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
981     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
982     push @jobstep_done, $jobstepid;
983     Log ($jobstepid, "success in $elapsed seconds");
984   }
985   $Jobstep->{exitcode} = $childstatus;
986   $Jobstep->{finishtime} = time;
987   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
988   $Jobstep->{'arvados_task'}->save;
989   process_stderr ($jobstepid, $task_success);
990   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
991
992   close $reader{$jobstepid};
993   delete $reader{$jobstepid};
994   delete $slot[$proc{$pid}->{slot}]->{pid};
995   push @freeslot, $proc{$pid}->{slot};
996   delete $proc{$pid};
997
998   if ($task_success) {
999     # Load new tasks
1000     my $newtask_list = [];
1001     my $newtask_results;
1002     do {
1003       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1004         'where' => {
1005           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1006         },
1007         'order' => 'qsequence',
1008         'offset' => scalar(@$newtask_list),
1009       );
1010       push(@$newtask_list, @{$newtask_results->{items}});
1011     } while (@{$newtask_results->{items}});
1012     foreach my $arvados_task (@$newtask_list) {
1013       my $jobstep = {
1014         'level' => $arvados_task->{'sequence'},
1015         'failures' => 0,
1016         'arvados_task' => $arvados_task
1017       };
1018       push @jobstep, $jobstep;
1019       push @jobstep_todo, $#jobstep;
1020     }
1021   }
1022
1023   $progress_is_dirty = 1;
1024   1;
1025 }
1026
1027 sub check_refresh_wanted
1028 {
1029   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1030   if (@stat && $stat[9] > $latest_refresh) {
1031     $latest_refresh = scalar time;
1032     if ($job_has_uuid) {
1033       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1034       for my $attr ('cancelled_at',
1035                     'cancelled_by_user_uuid',
1036                     'cancelled_by_client_uuid') {
1037         $Job->{$attr} = $Job2->{$attr};
1038       }
1039       if ($Job->{'cancelled_at'}) {
1040         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1041              " by user " . $Job->{cancelled_by_user_uuid});
1042         $main::success = 0;
1043         $main::please_freeze = 1;
1044       }
1045     }
1046   }
1047 }
1048
1049 sub check_squeue
1050 {
1051   # return if the kill list was checked <4 seconds ago
1052   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1053   {
1054     return;
1055   }
1056   $squeue_kill_checked = time;
1057
1058   # use killem() on procs whose killtime is reached
1059   for (keys %proc)
1060   {
1061     if (exists $proc{$_}->{killtime}
1062         && $proc{$_}->{killtime} <= time)
1063     {
1064       killem ($_);
1065     }
1066   }
1067
1068   # return if the squeue was checked <60 seconds ago
1069   if (defined $squeue_checked && $squeue_checked > time - 60)
1070   {
1071     return;
1072   }
1073   $squeue_checked = time;
1074
1075   if (!$have_slurm)
1076   {
1077     # here is an opportunity to check for mysterious problems with local procs
1078     return;
1079   }
1080
1081   # get a list of steps still running
1082   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1083   chop @squeue;
1084   if ($squeue[-1] ne "ok")
1085   {
1086     return;
1087   }
1088   pop @squeue;
1089
1090   # which of my jobsteps are running, according to squeue?
1091   my %ok;
1092   foreach (@squeue)
1093   {
1094     if (/^(\d+)\.(\d+) (\S+)/)
1095     {
1096       if ($1 eq $ENV{SLURM_JOBID})
1097       {
1098         $ok{$3} = 1;
1099       }
1100     }
1101   }
1102
1103   # which of my active child procs (>60s old) were not mentioned by squeue?
1104   foreach (keys %proc)
1105   {
1106     if ($proc{$_}->{time} < time - 60
1107         && !exists $ok{$proc{$_}->{jobstepname}}
1108         && !exists $proc{$_}->{killtime})
1109     {
1110       # kill this proc if it hasn't exited in 30 seconds
1111       $proc{$_}->{killtime} = time + 30;
1112     }
1113   }
1114 }
1115
1116
1117 sub release_allocation
1118 {
1119   if ($have_slurm)
1120   {
1121     Log (undef, "release job allocation");
1122     system "scancel $ENV{SLURM_JOBID}";
1123   }
1124 }
1125
1126
1127 sub readfrompipes
1128 {
1129   my $gotsome = 0;
1130   foreach my $job (keys %reader)
1131   {
1132     my $buf;
1133     while (0 < sysread ($reader{$job}, $buf, 8192))
1134     {
1135       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1136       $jobstep[$job]->{stderr} .= $buf;
1137       preprocess_stderr ($job);
1138       if (length ($jobstep[$job]->{stderr}) > 16384)
1139       {
1140         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1141       }
1142       $gotsome = 1;
1143     }
1144   }
1145   return $gotsome;
1146 }
1147
1148
1149 sub preprocess_stderr
1150 {
1151   my $job = shift;
1152
1153   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1154     my $line = $1;
1155     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1156     Log ($job, "stderr $line");
1157     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1158       # whoa.
1159       $main::please_freeze = 1;
1160     }
1161     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1162       $jobstep[$job]->{node_fail} = 1;
1163       ban_node_by_slot($jobstep[$job]->{slotindex});
1164     }
1165   }
1166 }
1167
1168
1169 sub process_stderr
1170 {
1171   my $job = shift;
1172   my $task_success = shift;
1173   preprocess_stderr ($job);
1174
1175   map {
1176     Log ($job, "stderr $_");
1177   } split ("\n", $jobstep[$job]->{stderr});
1178 }
1179
1180 sub fetch_block
1181 {
1182   my $hash = shift;
1183   my ($keep, $child_out, $output_block);
1184
1185   my $cmd = "arv-get \Q$hash\E";
1186   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1187   $output_block = '';
1188   while (1) {
1189     my $buf;
1190     my $bytes = sysread($keep, $buf, 1024 * 1024);
1191     if (!defined $bytes) {
1192       die "reading from arv-get: $!";
1193     } elsif ($bytes == 0) {
1194       # sysread returns 0 at the end of the pipe.
1195       last;
1196     } else {
1197       # some bytes were read into buf.
1198       $output_block .= $buf;
1199     }
1200   }
1201   close $keep;
1202   return $output_block;
1203 }
1204
1205 sub collate_output
1206 {
1207   Log (undef, "collate");
1208
1209   my ($child_out, $child_in);
1210   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1211                   '--retries', put_retry_count());
1212   my $joboutput;
1213   for (@jobstep)
1214   {
1215     next if (!exists $_->{'arvados_task'}->{'output'} ||
1216              !$_->{'arvados_task'}->{'success'});
1217     my $output = $_->{'arvados_task'}->{output};
1218     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1219     {
1220       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1221       print $child_in $output;
1222     }
1223     elsif (@jobstep == 1)
1224     {
1225       $joboutput = $output;
1226       last;
1227     }
1228     elsif (defined (my $outblock = fetch_block ($output)))
1229     {
1230       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1231       print $child_in $outblock;
1232     }
1233     else
1234     {
1235       Log (undef, "XXX fetch_block($output) failed XXX");
1236       $main::success = 0;
1237     }
1238   }
1239   $child_in->close;
1240
1241   if (!defined $joboutput) {
1242     my $s = IO::Select->new($child_out);
1243     if ($s->can_read(120)) {
1244       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1245       chomp($joboutput);
1246     } else {
1247       Log (undef, "timed out reading from 'arv-put'");
1248     }
1249   }
1250   waitpid($pid, 0);
1251
1252   return $joboutput;
1253 }
1254
1255
1256 sub killem
1257 {
1258   foreach (@_)
1259   {
1260     my $sig = 2;                # SIGINT first
1261     if (exists $proc{$_}->{"sent_$sig"} &&
1262         time - $proc{$_}->{"sent_$sig"} > 4)
1263     {
1264       $sig = 15;                # SIGTERM if SIGINT doesn't work
1265     }
1266     if (exists $proc{$_}->{"sent_$sig"} &&
1267         time - $proc{$_}->{"sent_$sig"} > 4)
1268     {
1269       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1270     }
1271     if (!exists $proc{$_}->{"sent_$sig"})
1272     {
1273       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1274       kill $sig, $_;
1275       select (undef, undef, undef, 0.1);
1276       if ($sig == 2)
1277       {
1278         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1279       }
1280       $proc{$_}->{"sent_$sig"} = time;
1281       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1282     }
1283   }
1284 }
1285
1286
1287 sub fhbits
1288 {
1289   my($bits);
1290   for (@_) {
1291     vec($bits,fileno($_),1) = 1;
1292   }
1293   $bits;
1294 }
1295
1296
1297 sub Log                         # ($jobstep_id, $logmessage)
1298 {
1299   if ($_[1] =~ /\n/) {
1300     for my $line (split (/\n/, $_[1])) {
1301       Log ($_[0], $line);
1302     }
1303     return;
1304   }
1305   my $fh = select STDERR; $|=1; select $fh;
1306   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1307   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1308   $message .= "\n";
1309   my $datetime;
1310   if ($local_logfile || -t STDERR) {
1311     my @gmtime = gmtime;
1312     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1313                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1314   }
1315   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1316
1317   if ($local_logfile) {
1318     print $local_logfile $datetime . " " . $message;
1319   }
1320 }
1321
1322
1323 sub croak
1324 {
1325   my ($package, $file, $line) = caller;
1326   my $message = "@_ at $file line $line\n";
1327   Log (undef, $message);
1328   freeze() if @jobstep_todo;
1329   collate_output() if @jobstep_todo;
1330   cleanup();
1331   save_meta() if $local_logfile;
1332   die;
1333 }
1334
1335
1336 sub cleanup
1337 {
1338   return if !$job_has_uuid;
1339   $Job->update_attributes('running' => 0,
1340                           'success' => 0,
1341                           'finished_at' => scalar gmtime);
1342 }
1343
1344
1345 sub save_meta
1346 {
1347   my $justcheckpoint = shift; # false if this will be the last meta saved
1348   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1349
1350   $local_logfile->flush;
1351   my $retry_count = put_retry_count();
1352   my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1353       "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1354   my $loglocator = `$cmd`;
1355   die "system $cmd failed: $?" if $?;
1356   chomp($loglocator);
1357
1358   $local_logfile = undef;   # the temp file is automatically deleted
1359   Log (undef, "log manifest is $loglocator");
1360   $Job->{'log'} = $loglocator;
1361   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1362 }
1363
1364
1365 sub freeze_if_want_freeze
1366 {
1367   if ($main::please_freeze)
1368   {
1369     release_allocation();
1370     if (@_)
1371     {
1372       # kill some srun procs before freeze+stop
1373       map { $proc{$_} = {} } @_;
1374       while (%proc)
1375       {
1376         killem (keys %proc);
1377         select (undef, undef, undef, 0.1);
1378         my $died;
1379         while (($died = waitpid (-1, WNOHANG)) > 0)
1380         {
1381           delete $proc{$died};
1382         }
1383       }
1384     }
1385     freeze();
1386     collate_output();
1387     cleanup();
1388     save_meta();
1389     exit 0;
1390   }
1391 }
1392
1393
1394 sub freeze
1395 {
1396   Log (undef, "Freeze not implemented");
1397   return;
1398 }
1399
1400
1401 sub thaw
1402 {
1403   croak ("Thaw not implemented");
1404 }
1405
1406
1407 sub freezequote
1408 {
1409   my $s = shift;
1410   $s =~ s/\\/\\\\/g;
1411   $s =~ s/\n/\\n/g;
1412   return $s;
1413 }
1414
1415
1416 sub freezeunquote
1417 {
1418   my $s = shift;
1419   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1420   return $s;
1421 }
1422
1423
1424 sub srun
1425 {
1426   my $srunargs = shift;
1427   my $execargs = shift;
1428   my $opts = shift || {};
1429   my $stdin = shift;
1430   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1431   print STDERR (join (" ",
1432                       map { / / ? "'$_'" : $_ }
1433                       (@$args)),
1434                 "\n")
1435       if $ENV{CRUNCH_DEBUG};
1436
1437   if (defined $stdin) {
1438     my $child = open STDIN, "-|";
1439     defined $child or die "no fork: $!";
1440     if ($child == 0) {
1441       print $stdin or die $!;
1442       close STDOUT or die $!;
1443       exit 0;
1444     }
1445   }
1446
1447   return system (@$args) if $opts->{fork};
1448
1449   exec @$args;
1450   warn "ENV size is ".length(join(" ",%ENV));
1451   die "exec failed: $!: @$args";
1452 }
1453
1454
1455 sub ban_node_by_slot {
1456   # Don't start any new jobsteps on this node for 60 seconds
1457   my $slotid = shift;
1458   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1459   $slot[$slotid]->{node}->{hold_count}++;
1460   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1461 }
1462
1463 sub must_lock_now
1464 {
1465   my ($lockfile, $error_message) = @_;
1466   open L, ">", $lockfile or croak("$lockfile: $!");
1467   if (!flock L, LOCK_EX|LOCK_NB) {
1468     croak("Can't lock $lockfile: $error_message\n");
1469   }
1470 }
1471
1472 sub find_docker_image {
1473   # Given a Keep locator, check to see if it contains a Docker image.
1474   # If so, return its stream name and Docker hash.
1475   # If not, return undef for both values.
1476   my $locator = shift;
1477   my ($streamname, $filename);
1478   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1479     foreach my $line (split(/\n/, $image->{manifest_text})) {
1480       my @tokens = split(/\s+/, $line);
1481       next if (!@tokens);
1482       $streamname = shift(@tokens);
1483       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1484         if (defined($filename)) {
1485           return (undef, undef);  # More than one file in the Collection.
1486         } else {
1487           $filename = (split(/:/, $filedata, 3))[2];
1488         }
1489       }
1490     }
1491   }
1492   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1493     return ($streamname, $1);
1494   } else {
1495     return (undef, undef);
1496   }
1497 }
1498
1499 sub put_retry_count {
1500   # Calculate a --retries argument for arv-put that will have it try
1501   # approximately as long as this Job has been running.
1502   my $stoptime = shift || time;
1503   my $starttime = $jobstep[0]->{starttime};
1504   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1505   my $retries = 0;
1506   while ($timediff >= 2) {
1507     $retries++;
1508     $timediff /= 2;
1509   }
1510   return ($retries > 3) ? $retries : 3;
1511 }
1512
1513 __DATA__
1514 #!/usr/bin/perl
1515
1516 # checkout-and-build
1517
1518 use Fcntl ':flock';
1519 use File::Path qw( make_path );
1520
1521 my $destdir = $ENV{"CRUNCH_SRC"};
1522 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1523 my $repo = $ENV{"CRUNCH_SRC_URL"};
1524 my $task_work = $ENV{"TASK_WORK"};
1525
1526 for my $dir ($destdir, $task_work) {
1527     if ($dir) {
1528         make_path $dir;
1529         -e $dir or die "Failed to create temporary directory ($dir): $!";
1530     }
1531 }
1532
1533 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1534 flock L, LOCK_EX;
1535 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1536     if (@ARGV) {
1537         exec(@ARGV);
1538         die "Cannot exec `@ARGV`: $!";
1539     } else {
1540         exit 0;
1541     }
1542 }
1543
1544 unlink "$destdir.commit";
1545 open STDOUT, ">", "$destdir.log";
1546 open STDERR, ">&STDOUT";
1547
1548 mkdir $destdir;
1549 my @git_archive_data = <DATA>;
1550 if (@git_archive_data) {
1551   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1552   print TARX @git_archive_data;
1553   if(!close(TARX)) {
1554     die "'tar -C $destdir -xf -' exited $?: $!";
1555   }
1556 }
1557
1558 my $pwd;
1559 chomp ($pwd = `pwd`);
1560 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1561 mkdir $install_dir;
1562
1563 for my $src_path ("$destdir/arvados/sdk/python") {
1564   if (-d $src_path) {
1565     shell_or_die ("virtualenv", $install_dir);
1566     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1567   }
1568 }
1569
1570 if (-e "$destdir/crunch_scripts/install") {
1571     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1572 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1573     # Old version
1574     shell_or_die ("./tests/autotests.sh", $install_dir);
1575 } elsif (-e "./install.sh") {
1576     shell_or_die ("./install.sh", $install_dir);
1577 }
1578
1579 if ($commit) {
1580     unlink "$destdir.commit.new";
1581     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1582     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1583 }
1584
1585 close L;
1586
1587 if (@ARGV) {
1588     exec(@ARGV);
1589     die "Cannot exec `@ARGV`: $!";
1590 } else {
1591     exit 0;
1592 }
1593
1594 sub shell_or_die
1595 {
1596   if ($ENV{"DEBUG"}) {
1597     print STDERR "@_\n";
1598   }
1599   system (@_) == 0
1600       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1601 }
1602
1603 __DATA__