3775: Remove stagnant $job_has_uuid flag. Every job has a uuid. The
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $local_job = 0;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job;
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($jobspec =~ /^[-a-z\d]+$/)
152 {
153   # $jobspec is an Arvados UUID, not a JSON job specification
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # If some other crunch-job process has grabbed this job (or we see
157     # other evidence that the job is already underway) we exit
158     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159     # mark the job as failed.
160     if ($Job->{'is_locked_by_uuid'}) {
161       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
162       exit EX_TEMPFAIL;
163     }
164     if ($Job->{'state'} ne 'Queued') {
165       Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
166       exit EX_TEMPFAIL;
167     }
168     if ($Job->{'success'} ne undef) {
169       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
170       exit EX_TEMPFAIL;
171     }
172     if ($Job->{'running'}) {
173       Log(undef, "Job 'running' flag is already set");
174       exit EX_TEMPFAIL;
175     }
176     if ($Job->{'started_at'}) {
177       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
178       exit EX_TEMPFAIL;
179     }
180   }
181 }
182 else
183 {
184   $Job = JSON::decode_json($jobspec);
185
186   if (!$resume_stash)
187   {
188     map { croak ("No $_ specified") unless $Job->{$_} }
189     qw(script script_version script_parameters);
190   }
191
192   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193   $Job->{'started_at'} = gmtime;
194
195   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
196 }
197 $job_id = $Job->{'uuid'};
198
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
201
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
205
206
207 Log (undef, "check slurm allocation");
208 my @slot;
209 my @node;
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
211 my @sinfo;
212 if (!$have_slurm)
213 {
214   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215   push @sinfo, "$localcpus localhost";
216 }
217 if (exists $ENV{SLURM_NODELIST})
218 {
219   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
220 }
221 foreach (@sinfo)
222 {
223   my ($ncpus, $slurm_nodelist) = split;
224   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
225
226   my @nodelist;
227   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
228   {
229     my $nodelist = $1;
230     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
231     {
232       my $ranges = $1;
233       foreach (split (",", $ranges))
234       {
235         my ($a, $b);
236         if (/(\d+)-(\d+)/)
237         {
238           $a = $1;
239           $b = $2;
240         }
241         else
242         {
243           $a = $_;
244           $b = $_;
245         }
246         push @nodelist, map {
247           my $n = $nodelist;
248           $n =~ s/\[[-,\d]+\]/$_/;
249           $n;
250         } ($a..$b);
251       }
252     }
253     else
254     {
255       push @nodelist, $nodelist;
256     }
257   }
258   foreach my $nodename (@nodelist)
259   {
260     Log (undef, "node $nodename - $ncpus slots");
261     my $node = { name => $nodename,
262                  ncpus => $ncpus,
263                  losing_streak => 0,
264                  hold_until => 0 };
265     foreach my $cpu (1..$ncpus)
266     {
267       push @slot, { node => $node,
268                     cpu => $cpu };
269     }
270   }
271   push @node, @nodelist;
272 }
273
274
275
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
278
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
280
281
282
283 my $jobmanager_id;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286         $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287   Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
288   exit EX_TEMPFAIL;
289 }
290 $Job->update_attributes('state' => 'Running',
291                         'tasks_summary' => { 'failed' => 0,
292                                              'todo' => 1,
293                                              'running' => 0,
294                                              'done' => 0 });
295
296
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
305
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
311
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
315
316
317 my @jobstep;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
322 my $squeue_checked;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
326
327
328
329 if (defined $Job->{thawedfromkey})
330 {
331   thaw ($Job->{thawedfromkey});
332 }
333 else
334 {
335   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336     'job_uuid' => $Job->{'uuid'},
337     'sequence' => 0,
338     'qsequence' => 0,
339     'parameters' => {},
340                                                           });
341   push @jobstep, { 'level' => 0,
342                    'failures' => 0,
343                    'arvados_task' => $first_task,
344                  };
345   push @jobstep_todo, 0;
346 }
347
348
349 if (!$have_slurm)
350 {
351   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
352 }
353
354
355 my $build_script;
356
357
358 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
359
360 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
361 if ($skip_install)
362 {
363   if (!defined $no_clear_tmp) {
364     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
365     system($clear_tmp_cmd) == 0
366         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
367   }
368   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
369   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
370     if (-d $src_path) {
371       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
372           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
373       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
374           == 0
375           or croak ("setup.py in $src_path failed: exit ".($?>>8));
376     }
377   }
378 }
379 else
380 {
381   do {
382     local $/ = undef;
383     $build_script = <DATA>;
384   };
385   Log (undef, "Install revision ".$Job->{script_version});
386   my $nodelist = join(",", @node);
387
388   if (!defined $no_clear_tmp) {
389     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390
391     my $cleanpid = fork();
392     if ($cleanpid == 0)
393     {
394       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
395             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
396       exit (1);
397     }
398     while (1)
399     {
400       last if $cleanpid == waitpid (-1, WNOHANG);
401       freeze_if_want_freeze ($cleanpid);
402       select (undef, undef, undef, 0.1);
403     }
404     Log (undef, "Clean-work-dir exited $?");
405   }
406
407   # Install requested code version
408
409   my @execargs;
410   my @srunargs = ("srun",
411                   "--nodelist=$nodelist",
412                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
413
414   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
415   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
416
417   my $commit;
418   my $git_archive;
419   my $treeish = $Job->{'script_version'};
420
421   # If we're running under crunch-dispatch, it will have pulled the
422   # appropriate source tree into its own repository, and given us that
423   # repo's path as $git_dir. If we're running a "local" job, and a
424   # script_version was specified, it's up to the user to provide the
425   # full path to a local repository in Job->{repository}.
426   #
427   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
428   # git-archive --remote where appropriate.
429   #
430   # TODO: Accept a locally-hosted Arvados repository by name or
431   # UUID. Use arvados.v1.repositories.list or .get to figure out the
432   # appropriate fetch-url.
433   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
434
435   $ENV{"CRUNCH_SRC_URL"} = $repo;
436
437   if (-d "$repo/.git") {
438     # We were given a working directory, but we are only interested in
439     # the index.
440     $repo = "$repo/.git";
441   }
442
443   # If this looks like a subversion r#, look for it in git-svn commit messages
444
445   if ($treeish =~ m{^\d{1,4}$}) {
446     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
447     chomp $gitlog;
448     Log(undef, "git Subversion search exited $?");
449     if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
450       $commit = $gitlog;
451       Log(undef, "Using commit $commit for Subversion revision $treeish");
452     }
453   }
454
455   # If that didn't work, try asking git to look it up as a tree-ish.
456
457   if (!defined $commit) {
458     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
459     chomp $found;
460     Log(undef, "git rev-list exited $? with result '$found'");
461     if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
462       $commit = $found;
463       Log(undef, "Using commit $commit for tree-ish $treeish");
464       if ($commit ne $treeish) {
465         # Make sure we record the real commit id in the database,
466         # frozentokey, logs, etc. -- instead of an abbreviation or a
467         # branch name which can become ambiguous or point to a
468         # different commit in the future.
469         $Job->{'script_version'} = $commit;
470         !$job_has_uuid or
471             $Job->update_attributes('script_version' => $commit) or
472             croak("Error while updating job");
473       }
474     }
475   }
476
477   if (defined $commit) {
478     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
479     @execargs = ("sh", "-c",
480                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
481     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
482     croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
483   }
484   else {
485     croak ("could not figure out commit id for $treeish");
486   }
487
488   # Note: this section is almost certainly unnecessary if we're
489   # running tasks in docker containers.
490   my $installpid = fork();
491   if ($installpid == 0)
492   {
493     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
494     exit (1);
495   }
496   while (1)
497   {
498     last if $installpid == waitpid (-1, WNOHANG);
499     freeze_if_want_freeze ($installpid);
500     select (undef, undef, undef, 0.1);
501   }
502   Log (undef, "Install exited $?");
503 }
504
505 if (!$have_slurm)
506 {
507   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 }
510
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516   if (!$docker_hash)
517   {
518     croak("No Docker image hash found from locator $docker_locator");
519   }
520   $docker_stream =~ s/^\.//;
521   my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 fi
525 };
526   my $docker_pid = fork();
527   if ($docker_pid == 0)
528   {
529     srun (["srun", "--nodelist=" . join(',', @node)],
530           ["/bin/sh", "-ec", $docker_install_script]);
531     exit ($?);
532   }
533   while (1)
534   {
535     last if $docker_pid == waitpid (-1, WNOHANG);
536     freeze_if_want_freeze ($docker_pid);
537     select (undef, undef, undef, 0.1);
538   }
539   if ($? != 0)
540   {
541     croak("Installing Docker image from $docker_locator returned exit code $?");
542   }
543 }
544
545 foreach (qw (script script_version script_parameters runtime_constraints))
546 {
547   Log (undef,
548        "$_ " .
549        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
550 }
551 foreach (split (/\n/, $Job->{knobs}))
552 {
553   Log (undef, "knob " . $_);
554 }
555
556
557
558 $main::success = undef;
559
560
561
562 ONELEVEL:
563
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
567
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569                        or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
572
573
574
575 my %proc;
576 my @freeslot = (0..$#slot);
577 my @holdslot;
578 my %reader;
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
581
582 update_progress_stats();
583
584
585
586 THISROUND:
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
588 {
589   my $id = $jobstep_todo[$todo_ptr];
590   my $Jobstep = $jobstep[$id];
591   if ($Jobstep->{level} != $level)
592   {
593     next;
594   }
595
596   pipe $reader{$id}, "writer" or croak ($!);
597   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
599
600   my $childslot = $freeslot[0];
601   my $childnode = $slot[$childslot]->{node};
602   my $childslotname = join (".",
603                             $slot[$childslot]->{node}->{name},
604                             $slot[$childslot]->{cpu});
605   my $childpid = fork();
606   if ($childpid == 0)
607   {
608     $SIG{'INT'} = 'DEFAULT';
609     $SIG{'QUIT'} = 'DEFAULT';
610     $SIG{'TERM'} = 'DEFAULT';
611
612     foreach (values (%reader))
613     {
614       close($_);
615     }
616     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617     open(STDOUT,">&writer");
618     open(STDERR,">&writer");
619
620     undef $dbh;
621     undef $sth;
622
623     delete $ENV{"GNUPGHOME"};
624     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625     $ENV{"TASK_QSEQUENCE"} = $id;
626     $ENV{"TASK_SEQUENCE"} = $level;
627     $ENV{"JOB_SCRIPT"} = $Job->{script};
628     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629       $param =~ tr/a-z/A-Z/;
630       $ENV{"JOB_PARAMETER_$param"} = $value;
631     }
632     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635     $ENV{"HOME"} = $ENV{"TASK_WORK"};
636     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
640
641     $ENV{"GZIP"} = "-n";
642
643     my @srunargs = (
644       "srun",
645       "--nodelist=".$childnode->{name},
646       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647       "--job-name=$job_id.$id.$$",
648         );
649     my $build_script_to_send = "";
650     my $command =
651         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
653         ."&& cd $ENV{CRUNCH_TMP} ";
654     if ($build_script)
655     {
656       $build_script_to_send = $build_script;
657       $command .=
658           "&& perl -";
659     }
660     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673       $command .= "--env=\QHOME=/home/crunch\E ";
674       while (my ($env_key, $env_val) = each %ENV)
675       {
676         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677           if ($env_key eq "TASK_WORK") {
678             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
679           }
680           elsif ($env_key eq "TASK_KEEPMOUNT") {
681             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
682           }
683           else {
684             $command .= "--env=\Q$env_key=$env_val\E ";
685           }
686         }
687       }
688       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
689       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
690       $command .= "\Q$docker_hash\E ";
691       $command .= "stdbuf --output=0 --error=0 ";
692       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
693     } else {
694       # Non-docker run
695       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
696       $command .= "stdbuf --output=0 --error=0 ";
697       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
698     }
699
700     my @execargs = ('bash', '-c', $command);
701     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
702     # exec() failed, we assume nothing happened.
703     Log(undef, "srun() failed on build script");
704     die;
705   }
706   close("writer");
707   if (!defined $childpid)
708   {
709     close $reader{$id};
710     delete $reader{$id};
711     next;
712   }
713   shift @freeslot;
714   $proc{$childpid} = { jobstep => $id,
715                        time => time,
716                        slot => $childslot,
717                        jobstepname => "$job_id.$id.$childpid",
718                      };
719   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
720   $slot[$childslot]->{pid} = $childpid;
721
722   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
723   Log ($id, "child $childpid started on $childslotname");
724   $Jobstep->{starttime} = time;
725   $Jobstep->{node} = $childnode->{name};
726   $Jobstep->{slotindex} = $childslot;
727   delete $Jobstep->{stderr};
728   delete $Jobstep->{finishtime};
729
730   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
731   $Jobstep->{'arvados_task'}->save;
732
733   splice @jobstep_todo, $todo_ptr, 1;
734   --$todo_ptr;
735
736   $progress_is_dirty = 1;
737
738   while (!@freeslot
739          ||
740          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
741   {
742     last THISROUND if $main::please_freeze;
743     if ($main::please_info)
744     {
745       $main::please_info = 0;
746       freeze();
747       collate_output();
748       save_meta(1);
749       update_progress_stats();
750     }
751     my $gotsome
752         = readfrompipes ()
753         + reapchildren ();
754     if (!$gotsome)
755     {
756       check_refresh_wanted();
757       check_squeue();
758       update_progress_stats();
759       select (undef, undef, undef, 0.1);
760     }
761     elsif (time - $progress_stats_updated >= 30)
762     {
763       update_progress_stats();
764     }
765     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
766         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
767     {
768       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
769           .($thisround_failed+$thisround_succeeded)
770           .") -- giving up on this round";
771       Log (undef, $message);
772       last THISROUND;
773     }
774
775     # move slots from freeslot to holdslot (or back to freeslot) if necessary
776     for (my $i=$#freeslot; $i>=0; $i--) {
777       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
778         push @holdslot, (splice @freeslot, $i, 1);
779       }
780     }
781     for (my $i=$#holdslot; $i>=0; $i--) {
782       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
783         push @freeslot, (splice @holdslot, $i, 1);
784       }
785     }
786
787     # give up if no nodes are succeeding
788     if (!grep { $_->{node}->{losing_streak} == 0 &&
789                     $_->{node}->{hold_count} < 4 } @slot) {
790       my $message = "Every node has failed -- giving up on this round";
791       Log (undef, $message);
792       last THISROUND;
793     }
794   }
795 }
796
797
798 push @freeslot, splice @holdslot;
799 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
800
801
802 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
803 while (%proc)
804 {
805   if ($main::please_continue) {
806     $main::please_continue = 0;
807     goto THISROUND;
808   }
809   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
810   readfrompipes ();
811   if (!reapchildren())
812   {
813     check_refresh_wanted();
814     check_squeue();
815     update_progress_stats();
816     select (undef, undef, undef, 0.1);
817     killem (keys %proc) if $main::please_freeze;
818   }
819 }
820
821 update_progress_stats();
822 freeze_if_want_freeze();
823
824
825 if (!defined $main::success)
826 {
827   if (@jobstep_todo &&
828       $thisround_succeeded == 0 &&
829       ($thisround_failed == 0 || $thisround_failed > 4))
830   {
831     my $message = "stop because $thisround_failed tasks failed and none succeeded";
832     Log (undef, $message);
833     $main::success = 0;
834   }
835   if (!@jobstep_todo)
836   {
837     $main::success = 1;
838   }
839 }
840
841 goto ONELEVEL if !defined $main::success;
842
843
844 release_allocation();
845 freeze();
846 my $collated_output = &collate_output();
847
848 if (!$collated_output) {
849   Log(undef, "output undef");
850 }
851 else {
852   eval {
853     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
854         or die "failed to get collated manifest: $!";
855     my $orig_manifest_text = '';
856     while (my $manifest_line = <$orig_manifest>) {
857       $orig_manifest_text .= $manifest_line;
858     }
859     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
860       'manifest_text' => $orig_manifest_text,
861     });
862     Log(undef, "output uuid " . $output->{uuid});
863     Log(undef, "output hash " . $output->{portable_data_hash});
864     $Job->update_attributes('output' => $output->{portable_data_hash});
865   };
866   if ($@) {
867     Log (undef, "Failed to register output manifest: $@");
868   }
869 }
870
871 Log (undef, "finish");
872
873 save_meta();
874
875 my $final_state;
876 if ($collated_output && $main::success) {
877   $final_state = 'Complete';
878 } else {
879   $final_state = 'Failed';
880 }
881 $Job->update_attributes('state' => $final_state);
882
883 exit (($final_state eq 'Complete') ? 0 : 1);
884
885
886
887 sub update_progress_stats
888 {
889   $progress_stats_updated = time;
890   return if !$progress_is_dirty;
891   my ($todo, $done, $running) = (scalar @jobstep_todo,
892                                  scalar @jobstep_done,
893                                  scalar @slot - scalar @freeslot - scalar @holdslot);
894   $Job->{'tasks_summary'} ||= {};
895   $Job->{'tasks_summary'}->{'todo'} = $todo;
896   $Job->{'tasks_summary'}->{'done'} = $done;
897   $Job->{'tasks_summary'}->{'running'} = $running;
898   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
899   Log (undef, "status: $done done, $running running, $todo todo");
900   $progress_is_dirty = 0;
901 }
902
903
904
905 sub reapchildren
906 {
907   my $pid = waitpid (-1, WNOHANG);
908   return 0 if $pid <= 0;
909
910   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
911                   . "."
912                   . $slot[$proc{$pid}->{slot}]->{cpu});
913   my $jobstepid = $proc{$pid}->{jobstep};
914   my $elapsed = time - $proc{$pid}->{time};
915   my $Jobstep = $jobstep[$jobstepid];
916
917   my $childstatus = $?;
918   my $exitvalue = $childstatus >> 8;
919   my $exitinfo = sprintf("exit %d signal %d%s",
920                          $exitvalue,
921                          $childstatus & 127,
922                          ($childstatus & 128 ? ' core dump' : ''));
923   $Jobstep->{'arvados_task'}->reload;
924   my $task_success = $Jobstep->{'arvados_task'}->{success};
925
926   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
927
928   if (!defined $task_success) {
929     # task did not indicate one way or the other --> fail
930     $Jobstep->{'arvados_task'}->{success} = 0;
931     $Jobstep->{'arvados_task'}->save;
932     $task_success = 0;
933   }
934
935   if (!$task_success)
936   {
937     my $temporary_fail;
938     $temporary_fail ||= $Jobstep->{node_fail};
939     $temporary_fail ||= ($exitvalue == 111);
940
941     ++$thisround_failed;
942     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
943
944     # Check for signs of a failed or misconfigured node
945     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
946         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
947       # Don't count this against jobstep failure thresholds if this
948       # node is already suspected faulty and srun exited quickly
949       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
950           $elapsed < 5) {
951         Log ($jobstepid, "blaming failure on suspect node " .
952              $slot[$proc{$pid}->{slot}]->{node}->{name});
953         $temporary_fail ||= 1;
954       }
955       ban_node_by_slot($proc{$pid}->{slot});
956     }
957
958     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
959                              ++$Jobstep->{'failures'},
960                              $temporary_fail ? 'temporary ' : 'permanent',
961                              $elapsed));
962
963     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
964       # Give up on this task, and the whole job
965       $main::success = 0;
966       $main::please_freeze = 1;
967     }
968     # Put this task back on the todo queue
969     push @jobstep_todo, $jobstepid;
970     $Job->{'tasks_summary'}->{'failed'}++;
971   }
972   else
973   {
974     ++$thisround_succeeded;
975     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
976     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
977     push @jobstep_done, $jobstepid;
978     Log ($jobstepid, "success in $elapsed seconds");
979   }
980   $Jobstep->{exitcode} = $childstatus;
981   $Jobstep->{finishtime} = time;
982   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
983   $Jobstep->{'arvados_task'}->save;
984   process_stderr ($jobstepid, $task_success);
985   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
986
987   close $reader{$jobstepid};
988   delete $reader{$jobstepid};
989   delete $slot[$proc{$pid}->{slot}]->{pid};
990   push @freeslot, $proc{$pid}->{slot};
991   delete $proc{$pid};
992
993   if ($task_success) {
994     # Load new tasks
995     my $newtask_list = [];
996     my $newtask_results;
997     do {
998       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
999         'where' => {
1000           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1001         },
1002         'order' => 'qsequence',
1003         'offset' => scalar(@$newtask_list),
1004       );
1005       push(@$newtask_list, @{$newtask_results->{items}});
1006     } while (@{$newtask_results->{items}});
1007     foreach my $arvados_task (@$newtask_list) {
1008       my $jobstep = {
1009         'level' => $arvados_task->{'sequence'},
1010         'failures' => 0,
1011         'arvados_task' => $arvados_task
1012       };
1013       push @jobstep, $jobstep;
1014       push @jobstep_todo, $#jobstep;
1015     }
1016   }
1017
1018   $progress_is_dirty = 1;
1019   1;
1020 }
1021
1022 sub check_refresh_wanted
1023 {
1024   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1025   if (@stat && $stat[9] > $latest_refresh) {
1026     $latest_refresh = scalar time;
1027     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1028     for my $attr ('cancelled_at',
1029                   'cancelled_by_user_uuid',
1030                   'cancelled_by_client_uuid',
1031                   'state') {
1032       $Job->{$attr} = $Job2->{$attr};
1033     }
1034     if ($Job->{'state'} ne "Running") {
1035       if ($Job->{'state'} eq "Cancelled") {
1036         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1037       } else {
1038         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1039       }
1040       $main::success = 0;
1041       $main::please_freeze = 1;
1042     }
1043   }
1044 }
1045
1046 sub check_squeue
1047 {
1048   # return if the kill list was checked <4 seconds ago
1049   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1050   {
1051     return;
1052   }
1053   $squeue_kill_checked = time;
1054
1055   # use killem() on procs whose killtime is reached
1056   for (keys %proc)
1057   {
1058     if (exists $proc{$_}->{killtime}
1059         && $proc{$_}->{killtime} <= time)
1060     {
1061       killem ($_);
1062     }
1063   }
1064
1065   # return if the squeue was checked <60 seconds ago
1066   if (defined $squeue_checked && $squeue_checked > time - 60)
1067   {
1068     return;
1069   }
1070   $squeue_checked = time;
1071
1072   if (!$have_slurm)
1073   {
1074     # here is an opportunity to check for mysterious problems with local procs
1075     return;
1076   }
1077
1078   # get a list of steps still running
1079   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1080   chop @squeue;
1081   if ($squeue[-1] ne "ok")
1082   {
1083     return;
1084   }
1085   pop @squeue;
1086
1087   # which of my jobsteps are running, according to squeue?
1088   my %ok;
1089   foreach (@squeue)
1090   {
1091     if (/^(\d+)\.(\d+) (\S+)/)
1092     {
1093       if ($1 eq $ENV{SLURM_JOBID})
1094       {
1095         $ok{$3} = 1;
1096       }
1097     }
1098   }
1099
1100   # which of my active child procs (>60s old) were not mentioned by squeue?
1101   foreach (keys %proc)
1102   {
1103     if ($proc{$_}->{time} < time - 60
1104         && !exists $ok{$proc{$_}->{jobstepname}}
1105         && !exists $proc{$_}->{killtime})
1106     {
1107       # kill this proc if it hasn't exited in 30 seconds
1108       $proc{$_}->{killtime} = time + 30;
1109     }
1110   }
1111 }
1112
1113
1114 sub release_allocation
1115 {
1116   if ($have_slurm)
1117   {
1118     Log (undef, "release job allocation");
1119     system "scancel $ENV{SLURM_JOBID}";
1120   }
1121 }
1122
1123
1124 sub readfrompipes
1125 {
1126   my $gotsome = 0;
1127   foreach my $job (keys %reader)
1128   {
1129     my $buf;
1130     while (0 < sysread ($reader{$job}, $buf, 8192))
1131     {
1132       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1133       $jobstep[$job]->{stderr} .= $buf;
1134       preprocess_stderr ($job);
1135       if (length ($jobstep[$job]->{stderr}) > 16384)
1136       {
1137         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1138       }
1139       $gotsome = 1;
1140     }
1141   }
1142   return $gotsome;
1143 }
1144
1145
1146 sub preprocess_stderr
1147 {
1148   my $job = shift;
1149
1150   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1151     my $line = $1;
1152     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1153     Log ($job, "stderr $line");
1154     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1155       # whoa.
1156       $main::please_freeze = 1;
1157     }
1158     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1159       $jobstep[$job]->{node_fail} = 1;
1160       ban_node_by_slot($jobstep[$job]->{slotindex});
1161     }
1162   }
1163 }
1164
1165
1166 sub process_stderr
1167 {
1168   my $job = shift;
1169   my $task_success = shift;
1170   preprocess_stderr ($job);
1171
1172   map {
1173     Log ($job, "stderr $_");
1174   } split ("\n", $jobstep[$job]->{stderr});
1175 }
1176
1177 sub fetch_block
1178 {
1179   my $hash = shift;
1180   my ($keep, $child_out, $output_block);
1181
1182   my $cmd = "arv-get \Q$hash\E";
1183   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1184   $output_block = '';
1185   while (1) {
1186     my $buf;
1187     my $bytes = sysread($keep, $buf, 1024 * 1024);
1188     if (!defined $bytes) {
1189       die "reading from arv-get: $!";
1190     } elsif ($bytes == 0) {
1191       # sysread returns 0 at the end of the pipe.
1192       last;
1193     } else {
1194       # some bytes were read into buf.
1195       $output_block .= $buf;
1196     }
1197   }
1198   close $keep;
1199   return $output_block;
1200 }
1201
1202 sub collate_output
1203 {
1204   Log (undef, "collate");
1205
1206   my ($child_out, $child_in);
1207   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1208                   '--retries', put_retry_count());
1209   my $joboutput;
1210   for (@jobstep)
1211   {
1212     next if (!exists $_->{'arvados_task'}->{'output'} ||
1213              !$_->{'arvados_task'}->{'success'});
1214     my $output = $_->{'arvados_task'}->{output};
1215     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1216     {
1217       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1218       print $child_in $output;
1219     }
1220     elsif (@jobstep == 1)
1221     {
1222       $joboutput = $output;
1223       last;
1224     }
1225     elsif (defined (my $outblock = fetch_block ($output)))
1226     {
1227       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1228       print $child_in $outblock;
1229     }
1230     else
1231     {
1232       Log (undef, "XXX fetch_block($output) failed XXX");
1233       $main::success = 0;
1234     }
1235   }
1236   $child_in->close;
1237
1238   if (!defined $joboutput) {
1239     my $s = IO::Select->new($child_out);
1240     if ($s->can_read(120)) {
1241       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1242       chomp($joboutput);
1243     } else {
1244       Log (undef, "timed out reading from 'arv-put'");
1245     }
1246   }
1247   waitpid($pid, 0);
1248
1249   return $joboutput;
1250 }
1251
1252
1253 sub killem
1254 {
1255   foreach (@_)
1256   {
1257     my $sig = 2;                # SIGINT first
1258     if (exists $proc{$_}->{"sent_$sig"} &&
1259         time - $proc{$_}->{"sent_$sig"} > 4)
1260     {
1261       $sig = 15;                # SIGTERM if SIGINT doesn't work
1262     }
1263     if (exists $proc{$_}->{"sent_$sig"} &&
1264         time - $proc{$_}->{"sent_$sig"} > 4)
1265     {
1266       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1267     }
1268     if (!exists $proc{$_}->{"sent_$sig"})
1269     {
1270       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1271       kill $sig, $_;
1272       select (undef, undef, undef, 0.1);
1273       if ($sig == 2)
1274       {
1275         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1276       }
1277       $proc{$_}->{"sent_$sig"} = time;
1278       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1279     }
1280   }
1281 }
1282
1283
1284 sub fhbits
1285 {
1286   my($bits);
1287   for (@_) {
1288     vec($bits,fileno($_),1) = 1;
1289   }
1290   $bits;
1291 }
1292
1293
1294 sub Log                         # ($jobstep_id, $logmessage)
1295 {
1296   if ($_[1] =~ /\n/) {
1297     for my $line (split (/\n/, $_[1])) {
1298       Log ($_[0], $line);
1299     }
1300     return;
1301   }
1302   my $fh = select STDERR; $|=1; select $fh;
1303   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1304   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1305   $message .= "\n";
1306   my $datetime;
1307   if ($local_logfile || -t STDERR) {
1308     my @gmtime = gmtime;
1309     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1310                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1311   }
1312   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1313
1314   if ($local_logfile) {
1315     print $local_logfile $datetime . " " . $message;
1316   }
1317 }
1318
1319
1320 sub croak
1321 {
1322   my ($package, $file, $line) = caller;
1323   my $message = "@_ at $file line $line\n";
1324   Log (undef, $message);
1325   freeze() if @jobstep_todo;
1326   collate_output() if @jobstep_todo;
1327   cleanup() if $Job;
1328   save_meta() if $local_logfile;
1329   die;
1330 }
1331
1332
1333 sub cleanup
1334 {
1335   if ($Job->{'state'} eq 'Cancelled') {
1336     $Job->update_attributes('finished_at' => scalar gmtime);
1337   } else {
1338     $Job->update_attributes('state' => 'Failed');
1339   }
1340 }
1341
1342
1343 sub save_meta
1344 {
1345   my $justcheckpoint = shift; # false if this will be the last meta saved
1346   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1347
1348   $local_logfile->flush;
1349   my $retry_count = put_retry_count();
1350   my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1351       "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1352   my $loglocator = `$cmd`;
1353   die "system $cmd failed: $?" if $?;
1354   chomp($loglocator);
1355
1356   $local_logfile = undef;   # the temp file is automatically deleted
1357   Log (undef, "log manifest is $loglocator");
1358   $Job->{'log'} = $loglocator;
1359   $Job->update_attributes('log', $loglocator);
1360 }
1361
1362
1363 sub freeze_if_want_freeze
1364 {
1365   if ($main::please_freeze)
1366   {
1367     release_allocation();
1368     if (@_)
1369     {
1370       # kill some srun procs before freeze+stop
1371       map { $proc{$_} = {} } @_;
1372       while (%proc)
1373       {
1374         killem (keys %proc);
1375         select (undef, undef, undef, 0.1);
1376         my $died;
1377         while (($died = waitpid (-1, WNOHANG)) > 0)
1378         {
1379           delete $proc{$died};
1380         }
1381       }
1382     }
1383     freeze();
1384     collate_output();
1385     cleanup();
1386     save_meta();
1387     exit 1;
1388   }
1389 }
1390
1391
1392 sub freeze
1393 {
1394   Log (undef, "Freeze not implemented");
1395   return;
1396 }
1397
1398
1399 sub thaw
1400 {
1401   croak ("Thaw not implemented");
1402 }
1403
1404
1405 sub freezequote
1406 {
1407   my $s = shift;
1408   $s =~ s/\\/\\\\/g;
1409   $s =~ s/\n/\\n/g;
1410   return $s;
1411 }
1412
1413
1414 sub freezeunquote
1415 {
1416   my $s = shift;
1417   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1418   return $s;
1419 }
1420
1421
1422 sub srun
1423 {
1424   my $srunargs = shift;
1425   my $execargs = shift;
1426   my $opts = shift || {};
1427   my $stdin = shift;
1428   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1429   print STDERR (join (" ",
1430                       map { / / ? "'$_'" : $_ }
1431                       (@$args)),
1432                 "\n")
1433       if $ENV{CRUNCH_DEBUG};
1434
1435   if (defined $stdin) {
1436     my $child = open STDIN, "-|";
1437     defined $child or die "no fork: $!";
1438     if ($child == 0) {
1439       print $stdin or die $!;
1440       close STDOUT or die $!;
1441       exit 0;
1442     }
1443   }
1444
1445   return system (@$args) if $opts->{fork};
1446
1447   exec @$args;
1448   warn "ENV size is ".length(join(" ",%ENV));
1449   die "exec failed: $!: @$args";
1450 }
1451
1452
1453 sub ban_node_by_slot {
1454   # Don't start any new jobsteps on this node for 60 seconds
1455   my $slotid = shift;
1456   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1457   $slot[$slotid]->{node}->{hold_count}++;
1458   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1459 }
1460
1461 sub must_lock_now
1462 {
1463   my ($lockfile, $error_message) = @_;
1464   open L, ">", $lockfile or croak("$lockfile: $!");
1465   if (!flock L, LOCK_EX|LOCK_NB) {
1466     croak("Can't lock $lockfile: $error_message\n");
1467   }
1468 }
1469
1470 sub find_docker_image {
1471   # Given a Keep locator, check to see if it contains a Docker image.
1472   # If so, return its stream name and Docker hash.
1473   # If not, return undef for both values.
1474   my $locator = shift;
1475   my ($streamname, $filename);
1476   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1477     foreach my $line (split(/\n/, $image->{manifest_text})) {
1478       my @tokens = split(/\s+/, $line);
1479       next if (!@tokens);
1480       $streamname = shift(@tokens);
1481       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1482         if (defined($filename)) {
1483           return (undef, undef);  # More than one file in the Collection.
1484         } else {
1485           $filename = (split(/:/, $filedata, 3))[2];
1486         }
1487       }
1488     }
1489   }
1490   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1491     return ($streamname, $1);
1492   } else {
1493     return (undef, undef);
1494   }
1495 }
1496
1497 sub put_retry_count {
1498   # Calculate a --retries argument for arv-put that will have it try
1499   # approximately as long as this Job has been running.
1500   my $stoptime = shift || time;
1501   my $starttime = $jobstep[0]->{starttime};
1502   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1503   my $retries = 0;
1504   while ($timediff >= 2) {
1505     $retries++;
1506     $timediff /= 2;
1507   }
1508   return ($retries > 3) ? $retries : 3;
1509 }
1510
1511 __DATA__
1512 #!/usr/bin/perl
1513
1514 # checkout-and-build
1515
1516 use Fcntl ':flock';
1517 use File::Path qw( make_path );
1518
1519 my $destdir = $ENV{"CRUNCH_SRC"};
1520 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1521 my $repo = $ENV{"CRUNCH_SRC_URL"};
1522 my $task_work = $ENV{"TASK_WORK"};
1523
1524 for my $dir ($destdir, $task_work) {
1525     if ($dir) {
1526         make_path $dir;
1527         -e $dir or die "Failed to create temporary directory ($dir): $!";
1528     }
1529 }
1530
1531 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1532 flock L, LOCK_EX;
1533 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1534     if (@ARGV) {
1535         exec(@ARGV);
1536         die "Cannot exec `@ARGV`: $!";
1537     } else {
1538         exit 0;
1539     }
1540 }
1541
1542 unlink "$destdir.commit";
1543 open STDOUT, ">", "$destdir.log";
1544 open STDERR, ">&STDOUT";
1545
1546 mkdir $destdir;
1547 my @git_archive_data = <DATA>;
1548 if (@git_archive_data) {
1549   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1550   print TARX @git_archive_data;
1551   if(!close(TARX)) {
1552     die "'tar -C $destdir -xf -' exited $?: $!";
1553   }
1554 }
1555
1556 my $pwd;
1557 chomp ($pwd = `pwd`);
1558 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1559 mkdir $install_dir;
1560
1561 for my $src_path ("$destdir/arvados/sdk/python") {
1562   if (-d $src_path) {
1563     shell_or_die ("virtualenv", $install_dir);
1564     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1565   }
1566 }
1567
1568 if (-e "$destdir/crunch_scripts/install") {
1569     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1570 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1571     # Old version
1572     shell_or_die ("./tests/autotests.sh", $install_dir);
1573 } elsif (-e "./install.sh") {
1574     shell_or_die ("./install.sh", $install_dir);
1575 }
1576
1577 if ($commit) {
1578     unlink "$destdir.commit.new";
1579     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1580     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1581 }
1582
1583 close L;
1584
1585 if (@ARGV) {
1586     exec(@ARGV);
1587     die "Cannot exec `@ARGV`: $!";
1588 } else {
1589     exit 0;
1590 }
1591
1592 sub shell_or_die
1593 {
1594   if ($ENV{"DEBUG"}) {
1595     print STDERR "@_\n";
1596   }
1597   system (@_) == 0
1598       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1599 }
1600
1601 __DATA__