Merge branch 'master' into 3836-remove-collection-from-project-bug
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
130
131
132 $SIG{'USR1'} = sub
133 {
134   $main::ENV{CRUNCH_DEBUG} = 1;
135 };
136 $SIG{'USR2'} = sub
137 {
138   $main::ENV{CRUNCH_DEBUG} = 0;
139 };
140
141
142
143 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $local_logfile;
145
146 my $User = $arv->{'users'}->{'current'}->execute;
147
148 my $Job = {};
149 my $job_id;
150 my $dbh;
151 my $sth;
152 if ($job_has_uuid)
153 {
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # If some other crunch-job process has grabbed this job (or we see
157     # other evidence that the job is already underway) we exit
158     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159     # mark the job as failed.
160     if ($Job->{'is_locked_by_uuid'}) {
161       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
162       exit EX_TEMPFAIL;
163     }
164     if ($Job->{'state'} ne 'Queued') {
165       Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
166       exit EX_TEMPFAIL;
167     }
168     if ($Job->{'success'} ne undef) {
169       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
170       exit EX_TEMPFAIL;
171     }
172     if ($Job->{'running'}) {
173       Log(undef, "Job 'running' flag is already set");
174       exit EX_TEMPFAIL;
175     }
176     if ($Job->{'started_at'}) {
177       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
178       exit EX_TEMPFAIL;
179     }
180   }
181 }
182 else
183 {
184   $Job = JSON::decode_json($jobspec);
185
186   if (!$resume_stash)
187   {
188     map { croak ("No $_ specified") unless $Job->{$_} }
189     qw(script script_version script_parameters);
190   }
191
192   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193   $Job->{'started_at'} = gmtime;
194
195   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
196
197   $job_has_uuid = 1;
198 }
199 $job_id = $Job->{'uuid'};
200
201 my $keep_logfile = $job_id . '.log.txt';
202 $local_logfile = File::Temp->new();
203
204 $Job->{'runtime_constraints'} ||= {};
205 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
206 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
207
208
209 Log (undef, "check slurm allocation");
210 my @slot;
211 my @node;
212 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
213 my @sinfo;
214 if (!$have_slurm)
215 {
216   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
217   push @sinfo, "$localcpus localhost";
218 }
219 if (exists $ENV{SLURM_NODELIST})
220 {
221   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
222 }
223 foreach (@sinfo)
224 {
225   my ($ncpus, $slurm_nodelist) = split;
226   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
227
228   my @nodelist;
229   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
230   {
231     my $nodelist = $1;
232     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
233     {
234       my $ranges = $1;
235       foreach (split (",", $ranges))
236       {
237         my ($a, $b);
238         if (/(\d+)-(\d+)/)
239         {
240           $a = $1;
241           $b = $2;
242         }
243         else
244         {
245           $a = $_;
246           $b = $_;
247         }
248         push @nodelist, map {
249           my $n = $nodelist;
250           $n =~ s/\[[-,\d]+\]/$_/;
251           $n;
252         } ($a..$b);
253       }
254     }
255     else
256     {
257       push @nodelist, $nodelist;
258     }
259   }
260   foreach my $nodename (@nodelist)
261   {
262     Log (undef, "node $nodename - $ncpus slots");
263     my $node = { name => $nodename,
264                  ncpus => $ncpus,
265                  losing_streak => 0,
266                  hold_until => 0 };
267     foreach my $cpu (1..$ncpus)
268     {
269       push @slot, { node => $node,
270                     cpu => $cpu };
271     }
272   }
273   push @node, @nodelist;
274 }
275
276
277
278 # Ensure that we get one jobstep running on each allocated node before
279 # we start overloading nodes with concurrent steps
280
281 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
282
283
284
285 my $jobmanager_id;
286 if ($job_has_uuid)
287 {
288   # Claim this job, and make sure nobody else does
289   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
290           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
291     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
292     exit EX_TEMPFAIL;
293   }
294   $Job->update_attributes('state' => 'Running',
295                           'tasks_summary' => { 'failed' => 0,
296                                                'todo' => 1,
297                                                'running' => 0,
298                                                'done' => 0 });
299 }
300
301
302 Log (undef, "start");
303 $SIG{'INT'} = sub { $main::please_freeze = 1; };
304 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
305 $SIG{'TERM'} = \&croak;
306 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
307 $SIG{'ALRM'} = sub { $main::please_info = 1; };
308 $SIG{'CONT'} = sub { $main::please_continue = 1; };
309 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
310
311 $main::please_freeze = 0;
312 $main::please_info = 0;
313 $main::please_continue = 0;
314 $main::please_refresh = 0;
315 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
316
317 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
318 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
319 $ENV{"JOB_UUID"} = $job_id;
320
321
322 my @jobstep;
323 my @jobstep_todo = ();
324 my @jobstep_done = ();
325 my @jobstep_tomerge = ();
326 my $jobstep_tomerge_level = 0;
327 my $squeue_checked;
328 my $squeue_kill_checked;
329 my $output_in_keep = 0;
330 my $latest_refresh = scalar time;
331
332
333
334 if (defined $Job->{thawedfromkey})
335 {
336   thaw ($Job->{thawedfromkey});
337 }
338 else
339 {
340   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
341     'job_uuid' => $Job->{'uuid'},
342     'sequence' => 0,
343     'qsequence' => 0,
344     'parameters' => {},
345                                                           });
346   push @jobstep, { 'level' => 0,
347                    'failures' => 0,
348                    'arvados_task' => $first_task,
349                  };
350   push @jobstep_todo, 0;
351 }
352
353
354 if (!$have_slurm)
355 {
356   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
357 }
358
359
360 my $build_script;
361
362
363 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
364
365 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
366 if ($skip_install)
367 {
368   if (!defined $no_clear_tmp) {
369     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
370     system($clear_tmp_cmd) == 0
371         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
372   }
373   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
374   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
375     if (-d $src_path) {
376       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
377           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
378       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
379           == 0
380           or croak ("setup.py in $src_path failed: exit ".($?>>8));
381     }
382   }
383 }
384 else
385 {
386   do {
387     local $/ = undef;
388     $build_script = <DATA>;
389   };
390   Log (undef, "Install revision ".$Job->{script_version});
391   my $nodelist = join(",", @node);
392
393   if (!defined $no_clear_tmp) {
394     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
395
396     my $cleanpid = fork();
397     if ($cleanpid == 0)
398     {
399       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
401       exit (1);
402     }
403     while (1)
404     {
405       last if $cleanpid == waitpid (-1, WNOHANG);
406       freeze_if_want_freeze ($cleanpid);
407       select (undef, undef, undef, 0.1);
408     }
409     Log (undef, "Clean-work-dir exited $?");
410   }
411
412   # Install requested code version
413
414   my @execargs;
415   my @srunargs = ("srun",
416                   "--nodelist=$nodelist",
417                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
418
419   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
420   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
421
422   my $commit;
423   my $git_archive;
424   my $treeish = $Job->{'script_version'};
425
426   # If we're running under crunch-dispatch, it will have pulled the
427   # appropriate source tree into its own repository, and given us that
428   # repo's path as $git_dir. If we're running a "local" job, and a
429   # script_version was specified, it's up to the user to provide the
430   # full path to a local repository in Job->{repository}.
431   #
432   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
433   # git-archive --remote where appropriate.
434   #
435   # TODO: Accept a locally-hosted Arvados repository by name or
436   # UUID. Use arvados.v1.repositories.list or .get to figure out the
437   # appropriate fetch-url.
438   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
439
440   $ENV{"CRUNCH_SRC_URL"} = $repo;
441
442   if (-d "$repo/.git") {
443     # We were given a working directory, but we are only interested in
444     # the index.
445     $repo = "$repo/.git";
446   }
447
448   # If this looks like a subversion r#, look for it in git-svn commit messages
449
450   if ($treeish =~ m{^\d{1,4}$}) {
451     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
452     chomp $gitlog;
453     Log(undef, "git Subversion search exited $?");
454     if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
455       $commit = $gitlog;
456       Log(undef, "Using commit $commit for Subversion revision $treeish");
457     }
458   }
459
460   # If that didn't work, try asking git to look it up as a tree-ish.
461
462   if (!defined $commit) {
463     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
464     chomp $found;
465     Log(undef, "git rev-list exited $? with result '$found'");
466     if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
467       $commit = $found;
468       Log(undef, "Using commit $commit for tree-ish $treeish");
469       if ($commit ne $treeish) {
470         # Make sure we record the real commit id in the database,
471         # frozentokey, logs, etc. -- instead of an abbreviation or a
472         # branch name which can become ambiguous or point to a
473         # different commit in the future.
474         $Job->{'script_version'} = $commit;
475         !$job_has_uuid or
476             $Job->update_attributes('script_version' => $commit) or
477             croak("Error while updating job");
478       }
479     }
480   }
481
482   if (defined $commit) {
483     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
484     @execargs = ("sh", "-c",
485                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
486     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
487     croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
488   }
489   else {
490     croak ("could not figure out commit id for $treeish");
491   }
492
493   # Note: this section is almost certainly unnecessary if we're
494   # running tasks in docker containers.
495   my $installpid = fork();
496   if ($installpid == 0)
497   {
498     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
499     exit (1);
500   }
501   while (1)
502   {
503     last if $installpid == waitpid (-1, WNOHANG);
504     freeze_if_want_freeze ($installpid);
505     select (undef, undef, undef, 0.1);
506   }
507   Log (undef, "Install exited $?");
508 }
509
510 if (!$have_slurm)
511 {
512   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
513   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
514 }
515
516 # If this job requires a Docker image, install that.
517 my $docker_bin = "/usr/bin/docker.io";
518 my ($docker_locator, $docker_stream, $docker_hash);
519 if ($docker_locator = $Job->{docker_image_locator}) {
520   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
521   if (!$docker_hash)
522   {
523     croak("No Docker image hash found from locator $docker_locator");
524   }
525   $docker_stream =~ s/^\.//;
526   my $docker_install_script = qq{
527 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
528     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
529 fi
530 };
531   my $docker_pid = fork();
532   if ($docker_pid == 0)
533   {
534     srun (["srun", "--nodelist=" . join(',', @node)],
535           ["/bin/sh", "-ec", $docker_install_script]);
536     exit ($?);
537   }
538   while (1)
539   {
540     last if $docker_pid == waitpid (-1, WNOHANG);
541     freeze_if_want_freeze ($docker_pid);
542     select (undef, undef, undef, 0.1);
543   }
544   if ($? != 0)
545   {
546     croak("Installing Docker image from $docker_locator returned exit code $?");
547   }
548 }
549
550 foreach (qw (script script_version script_parameters runtime_constraints))
551 {
552   Log (undef,
553        "$_ " .
554        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
555 }
556 foreach (split (/\n/, $Job->{knobs}))
557 {
558   Log (undef, "knob " . $_);
559 }
560
561
562
563 $main::success = undef;
564
565
566
567 ONELEVEL:
568
569 my $thisround_succeeded = 0;
570 my $thisround_failed = 0;
571 my $thisround_failed_multiple = 0;
572
573 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
574                        or $a <=> $b } @jobstep_todo;
575 my $level = $jobstep[$jobstep_todo[0]]->{level};
576 Log (undef, "start level $level");
577
578
579
580 my %proc;
581 my @freeslot = (0..$#slot);
582 my @holdslot;
583 my %reader;
584 my $progress_is_dirty = 1;
585 my $progress_stats_updated = 0;
586
587 update_progress_stats();
588
589
590
591 THISROUND:
592 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
593 {
594   my $id = $jobstep_todo[$todo_ptr];
595   my $Jobstep = $jobstep[$id];
596   if ($Jobstep->{level} != $level)
597   {
598     next;
599   }
600
601   pipe $reader{$id}, "writer" or croak ($!);
602   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
603   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
604
605   my $childslot = $freeslot[0];
606   my $childnode = $slot[$childslot]->{node};
607   my $childslotname = join (".",
608                             $slot[$childslot]->{node}->{name},
609                             $slot[$childslot]->{cpu});
610   my $childpid = fork();
611   if ($childpid == 0)
612   {
613     $SIG{'INT'} = 'DEFAULT';
614     $SIG{'QUIT'} = 'DEFAULT';
615     $SIG{'TERM'} = 'DEFAULT';
616
617     foreach (values (%reader))
618     {
619       close($_);
620     }
621     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
622     open(STDOUT,">&writer");
623     open(STDERR,">&writer");
624
625     undef $dbh;
626     undef $sth;
627
628     delete $ENV{"GNUPGHOME"};
629     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
630     $ENV{"TASK_QSEQUENCE"} = $id;
631     $ENV{"TASK_SEQUENCE"} = $level;
632     $ENV{"JOB_SCRIPT"} = $Job->{script};
633     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
634       $param =~ tr/a-z/A-Z/;
635       $ENV{"JOB_PARAMETER_$param"} = $value;
636     }
637     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
638     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
639     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
640     $ENV{"HOME"} = $ENV{"TASK_WORK"};
641     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
642     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
643     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
644     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
645
646     $ENV{"GZIP"} = "-n";
647
648     my @srunargs = (
649       "srun",
650       "--nodelist=".$childnode->{name},
651       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
652       "--job-name=$job_id.$id.$$",
653         );
654     my $build_script_to_send = "";
655     my $command =
656         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
657         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
658         ."&& cd $ENV{CRUNCH_TMP} ";
659     if ($build_script)
660     {
661       $build_script_to_send = $build_script;
662       $command .=
663           "&& perl -";
664     }
665     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
666     if ($docker_hash)
667     {
668       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
669       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
670       # Dynamically configure the container to use the host system as its
671       # DNS server.  Get the host's global addresses from the ip command,
672       # and turn them into docker --dns options using gawk.
673       $command .=
674           q{$(ip -o address show scope global |
675               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
676       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
677       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
678       $command .= "--env=\QHOME=/home/crunch\E ";
679       while (my ($env_key, $env_val) = each %ENV)
680       {
681         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
682           if ($env_key eq "TASK_WORK") {
683             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
684           }
685           elsif ($env_key eq "TASK_KEEPMOUNT") {
686             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
687           }
688           else {
689             $command .= "--env=\Q$env_key=$env_val\E ";
690           }
691         }
692       }
693       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
694       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
695       $command .= "\Q$docker_hash\E ";
696       $command .= "stdbuf --output=0 --error=0 ";
697       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
698     } else {
699       # Non-docker run
700       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
701       $command .= "stdbuf --output=0 --error=0 ";
702       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
703     }
704
705     my @execargs = ('bash', '-c', $command);
706     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
707     # exec() failed, we assume nothing happened.
708     Log(undef, "srun() failed on build script");
709     die;
710   }
711   close("writer");
712   if (!defined $childpid)
713   {
714     close $reader{$id};
715     delete $reader{$id};
716     next;
717   }
718   shift @freeslot;
719   $proc{$childpid} = { jobstep => $id,
720                        time => time,
721                        slot => $childslot,
722                        jobstepname => "$job_id.$id.$childpid",
723                      };
724   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
725   $slot[$childslot]->{pid} = $childpid;
726
727   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
728   Log ($id, "child $childpid started on $childslotname");
729   $Jobstep->{starttime} = time;
730   $Jobstep->{node} = $childnode->{name};
731   $Jobstep->{slotindex} = $childslot;
732   delete $Jobstep->{stderr};
733   delete $Jobstep->{finishtime};
734
735   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
736   $Jobstep->{'arvados_task'}->save;
737
738   splice @jobstep_todo, $todo_ptr, 1;
739   --$todo_ptr;
740
741   $progress_is_dirty = 1;
742
743   while (!@freeslot
744          ||
745          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
746   {
747     last THISROUND if $main::please_freeze;
748     if ($main::please_info)
749     {
750       $main::please_info = 0;
751       freeze();
752       collate_output();
753       save_meta(1);
754       update_progress_stats();
755     }
756     my $gotsome
757         = readfrompipes ()
758         + reapchildren ();
759     if (!$gotsome)
760     {
761       check_refresh_wanted();
762       check_squeue();
763       update_progress_stats();
764       select (undef, undef, undef, 0.1);
765     }
766     elsif (time - $progress_stats_updated >= 30)
767     {
768       update_progress_stats();
769     }
770     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
771         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
772     {
773       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
774           .($thisround_failed+$thisround_succeeded)
775           .") -- giving up on this round";
776       Log (undef, $message);
777       last THISROUND;
778     }
779
780     # move slots from freeslot to holdslot (or back to freeslot) if necessary
781     for (my $i=$#freeslot; $i>=0; $i--) {
782       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
783         push @holdslot, (splice @freeslot, $i, 1);
784       }
785     }
786     for (my $i=$#holdslot; $i>=0; $i--) {
787       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
788         push @freeslot, (splice @holdslot, $i, 1);
789       }
790     }
791
792     # give up if no nodes are succeeding
793     if (!grep { $_->{node}->{losing_streak} == 0 &&
794                     $_->{node}->{hold_count} < 4 } @slot) {
795       my $message = "Every node has failed -- giving up on this round";
796       Log (undef, $message);
797       last THISROUND;
798     }
799   }
800 }
801
802
803 push @freeslot, splice @holdslot;
804 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
805
806
807 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
808 while (%proc)
809 {
810   if ($main::please_continue) {
811     $main::please_continue = 0;
812     goto THISROUND;
813   }
814   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
815   readfrompipes ();
816   if (!reapchildren())
817   {
818     check_refresh_wanted();
819     check_squeue();
820     update_progress_stats();
821     select (undef, undef, undef, 0.1);
822     killem (keys %proc) if $main::please_freeze;
823   }
824 }
825
826 update_progress_stats();
827 freeze_if_want_freeze();
828
829
830 if (!defined $main::success)
831 {
832   if (@jobstep_todo &&
833       $thisround_succeeded == 0 &&
834       ($thisround_failed == 0 || $thisround_failed > 4))
835   {
836     my $message = "stop because $thisround_failed tasks failed and none succeeded";
837     Log (undef, $message);
838     $main::success = 0;
839   }
840   if (!@jobstep_todo)
841   {
842     $main::success = 1;
843   }
844 }
845
846 goto ONELEVEL if !defined $main::success;
847
848
849 release_allocation();
850 freeze();
851 my $collated_output = &collate_output();
852
853 if (!$collated_output) {
854   Log(undef, "output undef");
855 }
856 else {
857   eval {
858     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
859         or die "failed to get collated manifest: $!";
860     my $orig_manifest_text = '';
861     while (my $manifest_line = <$orig_manifest>) {
862       $orig_manifest_text .= $manifest_line;
863     }
864     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
865       'manifest_text' => $orig_manifest_text,
866     });
867     Log(undef, "output uuid " . $output->{uuid});
868     Log(undef, "output hash " . $output->{portable_data_hash});
869     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
870   };
871   if ($@) {
872     Log (undef, "Failed to register output manifest: $@");
873   }
874 }
875
876 Log (undef, "finish");
877
878 save_meta();
879
880 if ($job_has_uuid) {
881   if ($collated_output && $main::success) {
882     $Job->update_attributes('state' => 'Complete')
883   } else {
884     $Job->update_attributes('state' => 'Failed')
885   }
886 }
887
888 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
889
890
891
892 sub update_progress_stats
893 {
894   $progress_stats_updated = time;
895   return if !$progress_is_dirty;
896   my ($todo, $done, $running) = (scalar @jobstep_todo,
897                                  scalar @jobstep_done,
898                                  scalar @slot - scalar @freeslot - scalar @holdslot);
899   $Job->{'tasks_summary'} ||= {};
900   $Job->{'tasks_summary'}->{'todo'} = $todo;
901   $Job->{'tasks_summary'}->{'done'} = $done;
902   $Job->{'tasks_summary'}->{'running'} = $running;
903   if ($job_has_uuid) {
904     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
905   }
906   Log (undef, "status: $done done, $running running, $todo todo");
907   $progress_is_dirty = 0;
908 }
909
910
911
912 sub reapchildren
913 {
914   my $pid = waitpid (-1, WNOHANG);
915   return 0 if $pid <= 0;
916
917   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
918                   . "."
919                   . $slot[$proc{$pid}->{slot}]->{cpu});
920   my $jobstepid = $proc{$pid}->{jobstep};
921   my $elapsed = time - $proc{$pid}->{time};
922   my $Jobstep = $jobstep[$jobstepid];
923
924   my $childstatus = $?;
925   my $exitvalue = $childstatus >> 8;
926   my $exitinfo = sprintf("exit %d signal %d%s",
927                          $exitvalue,
928                          $childstatus & 127,
929                          ($childstatus & 128 ? ' core dump' : ''));
930   $Jobstep->{'arvados_task'}->reload;
931   my $task_success = $Jobstep->{'arvados_task'}->{success};
932
933   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
934
935   if (!defined $task_success) {
936     # task did not indicate one way or the other --> fail
937     $Jobstep->{'arvados_task'}->{success} = 0;
938     $Jobstep->{'arvados_task'}->save;
939     $task_success = 0;
940   }
941
942   if (!$task_success)
943   {
944     my $temporary_fail;
945     $temporary_fail ||= $Jobstep->{node_fail};
946     $temporary_fail ||= ($exitvalue == 111);
947
948     ++$thisround_failed;
949     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
950
951     # Check for signs of a failed or misconfigured node
952     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
953         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
954       # Don't count this against jobstep failure thresholds if this
955       # node is already suspected faulty and srun exited quickly
956       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
957           $elapsed < 5) {
958         Log ($jobstepid, "blaming failure on suspect node " .
959              $slot[$proc{$pid}->{slot}]->{node}->{name});
960         $temporary_fail ||= 1;
961       }
962       ban_node_by_slot($proc{$pid}->{slot});
963     }
964
965     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
966                              ++$Jobstep->{'failures'},
967                              $temporary_fail ? 'temporary ' : 'permanent',
968                              $elapsed));
969
970     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
971       # Give up on this task, and the whole job
972       $main::success = 0;
973       $main::please_freeze = 1;
974     }
975     else {
976       # Put this task back on the todo queue
977       push @jobstep_todo, $jobstepid;
978     }
979     $Job->{'tasks_summary'}->{'failed'}++;
980   }
981   else
982   {
983     ++$thisround_succeeded;
984     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
985     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
986     push @jobstep_done, $jobstepid;
987     Log ($jobstepid, "success in $elapsed seconds");
988   }
989   $Jobstep->{exitcode} = $childstatus;
990   $Jobstep->{finishtime} = time;
991   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
992   $Jobstep->{'arvados_task'}->save;
993   process_stderr ($jobstepid, $task_success);
994   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
995
996   close $reader{$jobstepid};
997   delete $reader{$jobstepid};
998   delete $slot[$proc{$pid}->{slot}]->{pid};
999   push @freeslot, $proc{$pid}->{slot};
1000   delete $proc{$pid};
1001
1002   if ($task_success) {
1003     # Load new tasks
1004     my $newtask_list = [];
1005     my $newtask_results;
1006     do {
1007       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1008         'where' => {
1009           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1010         },
1011         'order' => 'qsequence',
1012         'offset' => scalar(@$newtask_list),
1013       );
1014       push(@$newtask_list, @{$newtask_results->{items}});
1015     } while (@{$newtask_results->{items}});
1016     foreach my $arvados_task (@$newtask_list) {
1017       my $jobstep = {
1018         'level' => $arvados_task->{'sequence'},
1019         'failures' => 0,
1020         'arvados_task' => $arvados_task
1021       };
1022       push @jobstep, $jobstep;
1023       push @jobstep_todo, $#jobstep;
1024     }
1025   }
1026
1027   $progress_is_dirty = 1;
1028   1;
1029 }
1030
1031 sub check_refresh_wanted
1032 {
1033   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1034   if (@stat && $stat[9] > $latest_refresh) {
1035     $latest_refresh = scalar time;
1036     if ($job_has_uuid) {
1037       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1038       for my $attr ('cancelled_at',
1039                     'cancelled_by_user_uuid',
1040                     'cancelled_by_client_uuid',
1041                     'state') {
1042         $Job->{$attr} = $Job2->{$attr};
1043       }
1044       if ($Job->{'state'} ne "Running") {
1045         if ($Job->{'state'} eq "Cancelled") {
1046           Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1047         } else {
1048           Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1049         }
1050         $main::success = 0;
1051         $main::please_freeze = 1;
1052       }
1053     }
1054   }
1055 }
1056
1057 sub check_squeue
1058 {
1059   # return if the kill list was checked <4 seconds ago
1060   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1061   {
1062     return;
1063   }
1064   $squeue_kill_checked = time;
1065
1066   # use killem() on procs whose killtime is reached
1067   for (keys %proc)
1068   {
1069     if (exists $proc{$_}->{killtime}
1070         && $proc{$_}->{killtime} <= time)
1071     {
1072       killem ($_);
1073     }
1074   }
1075
1076   # return if the squeue was checked <60 seconds ago
1077   if (defined $squeue_checked && $squeue_checked > time - 60)
1078   {
1079     return;
1080   }
1081   $squeue_checked = time;
1082
1083   if (!$have_slurm)
1084   {
1085     # here is an opportunity to check for mysterious problems with local procs
1086     return;
1087   }
1088
1089   # get a list of steps still running
1090   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1091   chop @squeue;
1092   if ($squeue[-1] ne "ok")
1093   {
1094     return;
1095   }
1096   pop @squeue;
1097
1098   # which of my jobsteps are running, according to squeue?
1099   my %ok;
1100   foreach (@squeue)
1101   {
1102     if (/^(\d+)\.(\d+) (\S+)/)
1103     {
1104       if ($1 eq $ENV{SLURM_JOBID})
1105       {
1106         $ok{$3} = 1;
1107       }
1108     }
1109   }
1110
1111   # which of my active child procs (>60s old) were not mentioned by squeue?
1112   foreach (keys %proc)
1113   {
1114     if ($proc{$_}->{time} < time - 60
1115         && !exists $ok{$proc{$_}->{jobstepname}}
1116         && !exists $proc{$_}->{killtime})
1117     {
1118       # kill this proc if it hasn't exited in 30 seconds
1119       $proc{$_}->{killtime} = time + 30;
1120     }
1121   }
1122 }
1123
1124
1125 sub release_allocation
1126 {
1127   if ($have_slurm)
1128   {
1129     Log (undef, "release job allocation");
1130     system "scancel $ENV{SLURM_JOBID}";
1131   }
1132 }
1133
1134
1135 sub readfrompipes
1136 {
1137   my $gotsome = 0;
1138   foreach my $job (keys %reader)
1139   {
1140     my $buf;
1141     while (0 < sysread ($reader{$job}, $buf, 8192))
1142     {
1143       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1144       $jobstep[$job]->{stderr} .= $buf;
1145       preprocess_stderr ($job);
1146       if (length ($jobstep[$job]->{stderr}) > 16384)
1147       {
1148         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1149       }
1150       $gotsome = 1;
1151     }
1152   }
1153   return $gotsome;
1154 }
1155
1156
1157 sub preprocess_stderr
1158 {
1159   my $job = shift;
1160
1161   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1162     my $line = $1;
1163     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1164     Log ($job, "stderr $line");
1165     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1166       # whoa.
1167       $main::please_freeze = 1;
1168     }
1169     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1170       $jobstep[$job]->{node_fail} = 1;
1171       ban_node_by_slot($jobstep[$job]->{slotindex});
1172     }
1173   }
1174 }
1175
1176
1177 sub process_stderr
1178 {
1179   my $job = shift;
1180   my $task_success = shift;
1181   preprocess_stderr ($job);
1182
1183   map {
1184     Log ($job, "stderr $_");
1185   } split ("\n", $jobstep[$job]->{stderr});
1186 }
1187
1188 sub fetch_block
1189 {
1190   my $hash = shift;
1191   my ($keep, $child_out, $output_block);
1192
1193   my $cmd = "arv-get \Q$hash\E";
1194   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1195   $output_block = '';
1196   while (1) {
1197     my $buf;
1198     my $bytes = sysread($keep, $buf, 1024 * 1024);
1199     if (!defined $bytes) {
1200       die "reading from arv-get: $!";
1201     } elsif ($bytes == 0) {
1202       # sysread returns 0 at the end of the pipe.
1203       last;
1204     } else {
1205       # some bytes were read into buf.
1206       $output_block .= $buf;
1207     }
1208   }
1209   close $keep;
1210   return $output_block;
1211 }
1212
1213 sub collate_output
1214 {
1215   Log (undef, "collate");
1216
1217   my ($child_out, $child_in);
1218   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1219                   '--retries', put_retry_count());
1220   my $joboutput;
1221   for (@jobstep)
1222   {
1223     next if (!exists $_->{'arvados_task'}->{'output'} ||
1224              !$_->{'arvados_task'}->{'success'});
1225     my $output = $_->{'arvados_task'}->{output};
1226     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1227     {
1228       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1229       print $child_in $output;
1230     }
1231     elsif (@jobstep == 1)
1232     {
1233       $joboutput = $output;
1234       last;
1235     }
1236     elsif (defined (my $outblock = fetch_block ($output)))
1237     {
1238       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1239       print $child_in $outblock;
1240     }
1241     else
1242     {
1243       Log (undef, "XXX fetch_block($output) failed XXX");
1244       $main::success = 0;
1245     }
1246   }
1247   $child_in->close;
1248
1249   if (!defined $joboutput) {
1250     my $s = IO::Select->new($child_out);
1251     if ($s->can_read(120)) {
1252       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1253       chomp($joboutput);
1254     } else {
1255       Log (undef, "timed out reading from 'arv-put'");
1256     }
1257   }
1258   waitpid($pid, 0);
1259
1260   return $joboutput;
1261 }
1262
1263
1264 sub killem
1265 {
1266   foreach (@_)
1267   {
1268     my $sig = 2;                # SIGINT first
1269     if (exists $proc{$_}->{"sent_$sig"} &&
1270         time - $proc{$_}->{"sent_$sig"} > 4)
1271     {
1272       $sig = 15;                # SIGTERM if SIGINT doesn't work
1273     }
1274     if (exists $proc{$_}->{"sent_$sig"} &&
1275         time - $proc{$_}->{"sent_$sig"} > 4)
1276     {
1277       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1278     }
1279     if (!exists $proc{$_}->{"sent_$sig"})
1280     {
1281       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1282       kill $sig, $_;
1283       select (undef, undef, undef, 0.1);
1284       if ($sig == 2)
1285       {
1286         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1287       }
1288       $proc{$_}->{"sent_$sig"} = time;
1289       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1290     }
1291   }
1292 }
1293
1294
1295 sub fhbits
1296 {
1297   my($bits);
1298   for (@_) {
1299     vec($bits,fileno($_),1) = 1;
1300   }
1301   $bits;
1302 }
1303
1304
1305 sub Log                         # ($jobstep_id, $logmessage)
1306 {
1307   if ($_[1] =~ /\n/) {
1308     for my $line (split (/\n/, $_[1])) {
1309       Log ($_[0], $line);
1310     }
1311     return;
1312   }
1313   my $fh = select STDERR; $|=1; select $fh;
1314   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1315   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1316   $message .= "\n";
1317   my $datetime;
1318   if ($local_logfile || -t STDERR) {
1319     my @gmtime = gmtime;
1320     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1321                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1322   }
1323   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1324
1325   if ($local_logfile) {
1326     print $local_logfile $datetime . " " . $message;
1327   }
1328 }
1329
1330
1331 sub croak
1332 {
1333   my ($package, $file, $line) = caller;
1334   my $message = "@_ at $file line $line\n";
1335   Log (undef, $message);
1336   freeze() if @jobstep_todo;
1337   collate_output() if @jobstep_todo;
1338   cleanup();
1339   save_meta() if $local_logfile;
1340   die;
1341 }
1342
1343
1344 sub cleanup
1345 {
1346   return if !$job_has_uuid;
1347   if ($Job->{'state'} eq 'Cancelled') {
1348     $Job->update_attributes('finished_at' => scalar gmtime);
1349   } else {
1350     $Job->update_attributes('state' => 'Failed');
1351   }
1352 }
1353
1354
1355 sub save_meta
1356 {
1357   my $justcheckpoint = shift; # false if this will be the last meta saved
1358   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1359
1360   $local_logfile->flush;
1361   my $retry_count = put_retry_count();
1362   my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1363       "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1364   my $loglocator = `$cmd`;
1365   die "system $cmd failed: $?" if $?;
1366   chomp($loglocator);
1367
1368   $local_logfile = undef;   # the temp file is automatically deleted
1369   Log (undef, "log manifest is $loglocator");
1370   $Job->{'log'} = $loglocator;
1371   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1372 }
1373
1374
1375 sub freeze_if_want_freeze
1376 {
1377   if ($main::please_freeze)
1378   {
1379     release_allocation();
1380     if (@_)
1381     {
1382       # kill some srun procs before freeze+stop
1383       map { $proc{$_} = {} } @_;
1384       while (%proc)
1385       {
1386         killem (keys %proc);
1387         select (undef, undef, undef, 0.1);
1388         my $died;
1389         while (($died = waitpid (-1, WNOHANG)) > 0)
1390         {
1391           delete $proc{$died};
1392         }
1393       }
1394     }
1395     freeze();
1396     collate_output();
1397     cleanup();
1398     save_meta();
1399     exit 0;
1400   }
1401 }
1402
1403
1404 sub freeze
1405 {
1406   Log (undef, "Freeze not implemented");
1407   return;
1408 }
1409
1410
1411 sub thaw
1412 {
1413   croak ("Thaw not implemented");
1414 }
1415
1416
1417 sub freezequote
1418 {
1419   my $s = shift;
1420   $s =~ s/\\/\\\\/g;
1421   $s =~ s/\n/\\n/g;
1422   return $s;
1423 }
1424
1425
1426 sub freezeunquote
1427 {
1428   my $s = shift;
1429   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1430   return $s;
1431 }
1432
1433
1434 sub srun
1435 {
1436   my $srunargs = shift;
1437   my $execargs = shift;
1438   my $opts = shift || {};
1439   my $stdin = shift;
1440   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1441   print STDERR (join (" ",
1442                       map { / / ? "'$_'" : $_ }
1443                       (@$args)),
1444                 "\n")
1445       if $ENV{CRUNCH_DEBUG};
1446
1447   if (defined $stdin) {
1448     my $child = open STDIN, "-|";
1449     defined $child or die "no fork: $!";
1450     if ($child == 0) {
1451       print $stdin or die $!;
1452       close STDOUT or die $!;
1453       exit 0;
1454     }
1455   }
1456
1457   return system (@$args) if $opts->{fork};
1458
1459   exec @$args;
1460   warn "ENV size is ".length(join(" ",%ENV));
1461   die "exec failed: $!: @$args";
1462 }
1463
1464
1465 sub ban_node_by_slot {
1466   # Don't start any new jobsteps on this node for 60 seconds
1467   my $slotid = shift;
1468   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1469   $slot[$slotid]->{node}->{hold_count}++;
1470   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1471 }
1472
1473 sub must_lock_now
1474 {
1475   my ($lockfile, $error_message) = @_;
1476   open L, ">", $lockfile or croak("$lockfile: $!");
1477   if (!flock L, LOCK_EX|LOCK_NB) {
1478     croak("Can't lock $lockfile: $error_message\n");
1479   }
1480 }
1481
1482 sub find_docker_image {
1483   # Given a Keep locator, check to see if it contains a Docker image.
1484   # If so, return its stream name and Docker hash.
1485   # If not, return undef for both values.
1486   my $locator = shift;
1487   my ($streamname, $filename);
1488   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1489     foreach my $line (split(/\n/, $image->{manifest_text})) {
1490       my @tokens = split(/\s+/, $line);
1491       next if (!@tokens);
1492       $streamname = shift(@tokens);
1493       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1494         if (defined($filename)) {
1495           return (undef, undef);  # More than one file in the Collection.
1496         } else {
1497           $filename = (split(/:/, $filedata, 3))[2];
1498         }
1499       }
1500     }
1501   }
1502   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1503     return ($streamname, $1);
1504   } else {
1505     return (undef, undef);
1506   }
1507 }
1508
1509 sub put_retry_count {
1510   # Calculate a --retries argument for arv-put that will have it try
1511   # approximately as long as this Job has been running.
1512   my $stoptime = shift || time;
1513   my $starttime = $jobstep[0]->{starttime};
1514   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1515   my $retries = 0;
1516   while ($timediff >= 2) {
1517     $retries++;
1518     $timediff /= 2;
1519   }
1520   return ($retries > 3) ? $retries : 3;
1521 }
1522
1523 __DATA__
1524 #!/usr/bin/perl
1525
1526 # checkout-and-build
1527
1528 use Fcntl ':flock';
1529 use File::Path qw( make_path );
1530
1531 my $destdir = $ENV{"CRUNCH_SRC"};
1532 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1533 my $repo = $ENV{"CRUNCH_SRC_URL"};
1534 my $task_work = $ENV{"TASK_WORK"};
1535
1536 for my $dir ($destdir, $task_work) {
1537     if ($dir) {
1538         make_path $dir;
1539         -e $dir or die "Failed to create temporary directory ($dir): $!";
1540     }
1541 }
1542
1543 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1544 flock L, LOCK_EX;
1545 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1546     if (@ARGV) {
1547         exec(@ARGV);
1548         die "Cannot exec `@ARGV`: $!";
1549     } else {
1550         exit 0;
1551     }
1552 }
1553
1554 unlink "$destdir.commit";
1555 open STDOUT, ">", "$destdir.log";
1556 open STDERR, ">&STDOUT";
1557
1558 mkdir $destdir;
1559 my @git_archive_data = <DATA>;
1560 if (@git_archive_data) {
1561   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1562   print TARX @git_archive_data;
1563   if(!close(TARX)) {
1564     die "'tar -C $destdir -xf -' exited $?: $!";
1565   }
1566 }
1567
1568 my $pwd;
1569 chomp ($pwd = `pwd`);
1570 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1571 mkdir $install_dir;
1572
1573 for my $src_path ("$destdir/arvados/sdk/python") {
1574   if (-d $src_path) {
1575     shell_or_die ("virtualenv", $install_dir);
1576     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1577   }
1578 }
1579
1580 if (-e "$destdir/crunch_scripts/install") {
1581     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1582 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1583     # Old version
1584     shell_or_die ("./tests/autotests.sh", $install_dir);
1585 } elsif (-e "./install.sh") {
1586     shell_or_die ("./install.sh", $install_dir);
1587 }
1588
1589 if ($commit) {
1590     unlink "$destdir.commit.new";
1591     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1592     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1593 }
1594
1595 close L;
1596
1597 if (@ARGV) {
1598     exec(@ARGV);
1599     die "Cannot exec `@ARGV`: $!";
1600 } else {
1601     exit 0;
1602 }
1603
1604 sub shell_or_die
1605 {
1606   if ($ENV{"DEBUG"}) {
1607     print STDERR "@_\n";
1608   }
1609   system (@_) == 0
1610       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1611 }
1612
1613 __DATA__