3825: write log output directly to a pipe.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
130
131
132 $SIG{'USR1'} = sub
133 {
134   $main::ENV{CRUNCH_DEBUG} = 1;
135 };
136 $SIG{'USR2'} = sub
137 {
138   $main::ENV{CRUNCH_DEBUG} = 0;
139 };
140
141
142
143 my $arv = Arvados->new('apiVersion' => 'v1');
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'state'} ne 'Queued') {
164       Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'success'} ne undef) {
168       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'running'}) {
172       Log(undef, "Job 'running' flag is already set");
173       exit EX_TEMPFAIL;
174     }
175     if ($Job->{'started_at'}) {
176       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
177       exit EX_TEMPFAIL;
178     }
179   }
180 }
181 else
182 {
183   $Job = JSON::decode_json($jobspec);
184
185   if (!$resume_stash)
186   {
187     map { croak ("No $_ specified") unless $Job->{$_} }
188     qw(script script_version script_parameters);
189   }
190
191   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
192   $Job->{'started_at'} = gmtime;
193
194   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
195
196   $job_has_uuid = 1;
197 }
198 $job_id = $Job->{'uuid'};
199
200 my $keep_logfile = $job_id . '.log.txt';
201 start_output_log($keep_logfile);
202
203 $Job->{'runtime_constraints'} ||= {};
204 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
205 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
206
207
208 Log (undef, "check slurm allocation");
209 my @slot;
210 my @node;
211 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
212 my @sinfo;
213 if (!$have_slurm)
214 {
215   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
216   push @sinfo, "$localcpus localhost";
217 }
218 if (exists $ENV{SLURM_NODELIST})
219 {
220   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
221 }
222 foreach (@sinfo)
223 {
224   my ($ncpus, $slurm_nodelist) = split;
225   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
226
227   my @nodelist;
228   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
229   {
230     my $nodelist = $1;
231     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
232     {
233       my $ranges = $1;
234       foreach (split (",", $ranges))
235       {
236         my ($a, $b);
237         if (/(\d+)-(\d+)/)
238         {
239           $a = $1;
240           $b = $2;
241         }
242         else
243         {
244           $a = $_;
245           $b = $_;
246         }
247         push @nodelist, map {
248           my $n = $nodelist;
249           $n =~ s/\[[-,\d]+\]/$_/;
250           $n;
251         } ($a..$b);
252       }
253     }
254     else
255     {
256       push @nodelist, $nodelist;
257     }
258   }
259   foreach my $nodename (@nodelist)
260   {
261     Log (undef, "node $nodename - $ncpus slots");
262     my $node = { name => $nodename,
263                  ncpus => $ncpus,
264                  losing_streak => 0,
265                  hold_until => 0 };
266     foreach my $cpu (1..$ncpus)
267     {
268       push @slot, { node => $node,
269                     cpu => $cpu };
270     }
271   }
272   push @node, @nodelist;
273 }
274
275
276
277 # Ensure that we get one jobstep running on each allocated node before
278 # we start overloading nodes with concurrent steps
279
280 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
281
282
283
284 my $jobmanager_id;
285 if ($job_has_uuid)
286 {
287   # Claim this job, and make sure nobody else does
288   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
289           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
290     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
291     exit EX_TEMPFAIL;
292   }
293   $Job->update_attributes('state' => 'Running',
294                           'tasks_summary' => { 'failed' => 0,
295                                                'todo' => 1,
296                                                'running' => 0,
297                                                'done' => 0 });
298 }
299
300
301 Log (undef, "start");
302 $SIG{'INT'} = sub { $main::please_freeze = 1; };
303 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
304 $SIG{'TERM'} = \&croak;
305 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
306 $SIG{'ALRM'} = sub { $main::please_info = 1; };
307 $SIG{'CONT'} = sub { $main::please_continue = 1; };
308 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
309
310 $main::please_freeze = 0;
311 $main::please_info = 0;
312 $main::please_continue = 0;
313 $main::please_refresh = 0;
314 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
315
316 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
317 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
318 $ENV{"JOB_UUID"} = $job_id;
319
320
321 my @jobstep;
322 my @jobstep_todo = ();
323 my @jobstep_done = ();
324 my @jobstep_tomerge = ();
325 my $jobstep_tomerge_level = 0;
326 my $squeue_checked;
327 my $squeue_kill_checked;
328 my $output_in_keep = 0;
329 my $latest_refresh = scalar time;
330
331
332
333 if (defined $Job->{thawedfromkey})
334 {
335   thaw ($Job->{thawedfromkey});
336 }
337 else
338 {
339   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
340     'job_uuid' => $Job->{'uuid'},
341     'sequence' => 0,
342     'qsequence' => 0,
343     'parameters' => {},
344                                                           });
345   push @jobstep, { 'level' => 0,
346                    'failures' => 0,
347                    'arvados_task' => $first_task,
348                  };
349   push @jobstep_todo, 0;
350 }
351
352
353 if (!$have_slurm)
354 {
355   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
356 }
357
358
359 my $build_script;
360
361
362 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
363
364 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
365 if ($skip_install)
366 {
367   if (!defined $no_clear_tmp) {
368     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
369     system($clear_tmp_cmd) == 0
370         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
371   }
372   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
373   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
374     if (-d $src_path) {
375       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
376           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
377       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
378           == 0
379           or croak ("setup.py in $src_path failed: exit ".($?>>8));
380     }
381   }
382 }
383 else
384 {
385   do {
386     local $/ = undef;
387     $build_script = <DATA>;
388   };
389   Log (undef, "Install revision ".$Job->{script_version});
390   my $nodelist = join(",", @node);
391
392   if (!defined $no_clear_tmp) {
393     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
394
395     my $cleanpid = fork();
396     if ($cleanpid == 0)
397     {
398       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
399             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
400       exit (1);
401     }
402     while (1)
403     {
404       last if $cleanpid == waitpid (-1, WNOHANG);
405       freeze_if_want_freeze ($cleanpid);
406       select (undef, undef, undef, 0.1);
407     }
408     Log (undef, "Clean-work-dir exited $?");
409   }
410
411   # Install requested code version
412
413   my @execargs;
414   my @srunargs = ("srun",
415                   "--nodelist=$nodelist",
416                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
417
418   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
419   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
420
421   my $commit;
422   my $git_archive;
423   my $treeish = $Job->{'script_version'};
424
425   # If we're running under crunch-dispatch, it will have pulled the
426   # appropriate source tree into its own repository, and given us that
427   # repo's path as $git_dir. If we're running a "local" job, and a
428   # script_version was specified, it's up to the user to provide the
429   # full path to a local repository in Job->{repository}.
430   #
431   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
432   # git-archive --remote where appropriate.
433   #
434   # TODO: Accept a locally-hosted Arvados repository by name or
435   # UUID. Use arvados.v1.repositories.list or .get to figure out the
436   # appropriate fetch-url.
437   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
438
439   $ENV{"CRUNCH_SRC_URL"} = $repo;
440
441   if (-d "$repo/.git") {
442     # We were given a working directory, but we are only interested in
443     # the index.
444     $repo = "$repo/.git";
445   }
446
447   # If this looks like a subversion r#, look for it in git-svn commit messages
448
449   if ($treeish =~ m{^\d{1,4}$}) {
450     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
451     chomp $gitlog;
452     Log(undef, "git Subversion search exited $?");
453     if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
454       $commit = $gitlog;
455       Log(undef, "Using commit $commit for Subversion revision $treeish");
456     }
457   }
458
459   # If that didn't work, try asking git to look it up as a tree-ish.
460
461   if (!defined $commit) {
462     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
463     chomp $found;
464     Log(undef, "git rev-list exited $? with result '$found'");
465     if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
466       $commit = $found;
467       Log(undef, "Using commit $commit for tree-ish $treeish");
468       if ($commit ne $treeish) {
469         # Make sure we record the real commit id in the database,
470         # frozentokey, logs, etc. -- instead of an abbreviation or a
471         # branch name which can become ambiguous or point to a
472         # different commit in the future.
473         $Job->{'script_version'} = $commit;
474         !$job_has_uuid or
475             $Job->update_attributes('script_version' => $commit) or
476             croak("Error while updating job");
477       }
478     }
479   }
480
481   if (defined $commit) {
482     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
483     @execargs = ("sh", "-c",
484                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
485     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
486     croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
487   }
488   else {
489     croak ("could not figure out commit id for $treeish");
490   }
491
492   # Note: this section is almost certainly unnecessary if we're
493   # running tasks in docker containers.
494   my $installpid = fork();
495   if ($installpid == 0)
496   {
497     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
498     exit (1);
499   }
500   while (1)
501   {
502     last if $installpid == waitpid (-1, WNOHANG);
503     freeze_if_want_freeze ($installpid);
504     select (undef, undef, undef, 0.1);
505   }
506   Log (undef, "Install exited $?");
507 }
508
509 if (!$have_slurm)
510 {
511   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
512   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
513 }
514
515 # If this job requires a Docker image, install that.
516 my $docker_bin = "/usr/bin/docker.io";
517 my ($docker_locator, $docker_stream, $docker_hash);
518 if ($docker_locator = $Job->{docker_image_locator}) {
519   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
520   if (!$docker_hash)
521   {
522     croak("No Docker image hash found from locator $docker_locator");
523   }
524   $docker_stream =~ s/^\.//;
525   my $docker_install_script = qq{
526 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
527     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
528 fi
529 };
530   my $docker_pid = fork();
531   if ($docker_pid == 0)
532   {
533     srun (["srun", "--nodelist=" . join(',', @node)],
534           ["/bin/sh", "-ec", $docker_install_script]);
535     exit ($?);
536   }
537   while (1)
538   {
539     last if $docker_pid == waitpid (-1, WNOHANG);
540     freeze_if_want_freeze ($docker_pid);
541     select (undef, undef, undef, 0.1);
542   }
543   if ($? != 0)
544   {
545     croak("Installing Docker image from $docker_locator returned exit code $?");
546   }
547 }
548
549 foreach (qw (script script_version script_parameters runtime_constraints))
550 {
551   Log (undef,
552        "$_ " .
553        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
554 }
555 foreach (split (/\n/, $Job->{knobs}))
556 {
557   Log (undef, "knob " . $_);
558 }
559
560
561
562 $main::success = undef;
563
564
565
566 ONELEVEL:
567
568 my $thisround_succeeded = 0;
569 my $thisround_failed = 0;
570 my $thisround_failed_multiple = 0;
571
572 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
573                        or $a <=> $b } @jobstep_todo;
574 my $level = $jobstep[$jobstep_todo[0]]->{level};
575 Log (undef, "start level $level");
576
577
578
579 my %proc;
580 my @freeslot = (0..$#slot);
581 my @holdslot;
582 my %reader;
583 my $progress_is_dirty = 1;
584 my $progress_stats_updated = 0;
585
586 update_progress_stats();
587
588
589
590 THISROUND:
591 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
592 {
593   my $id = $jobstep_todo[$todo_ptr];
594   my $Jobstep = $jobstep[$id];
595   if ($Jobstep->{level} != $level)
596   {
597     next;
598   }
599
600   pipe $reader{$id}, "writer" or croak ($!);
601   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
602   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
603
604   my $childslot = $freeslot[0];
605   my $childnode = $slot[$childslot]->{node};
606   my $childslotname = join (".",
607                             $slot[$childslot]->{node}->{name},
608                             $slot[$childslot]->{cpu});
609   my $childpid = fork();
610   if ($childpid == 0)
611   {
612     $SIG{'INT'} = 'DEFAULT';
613     $SIG{'QUIT'} = 'DEFAULT';
614     $SIG{'TERM'} = 'DEFAULT';
615
616     foreach (values (%reader))
617     {
618       close($_);
619     }
620     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
621     open(STDOUT,">&writer");
622     open(STDERR,">&writer");
623
624     undef $dbh;
625     undef $sth;
626
627     delete $ENV{"GNUPGHOME"};
628     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
629     $ENV{"TASK_QSEQUENCE"} = $id;
630     $ENV{"TASK_SEQUENCE"} = $level;
631     $ENV{"JOB_SCRIPT"} = $Job->{script};
632     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
633       $param =~ tr/a-z/A-Z/;
634       $ENV{"JOB_PARAMETER_$param"} = $value;
635     }
636     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
637     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
638     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
639     $ENV{"HOME"} = $ENV{"TASK_WORK"};
640     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
641     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
642     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
643     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
644
645     $ENV{"GZIP"} = "-n";
646
647     my @srunargs = (
648       "srun",
649       "--nodelist=".$childnode->{name},
650       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
651       "--job-name=$job_id.$id.$$",
652         );
653     my $build_script_to_send = "";
654     my $command =
655         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
656         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
657         ."&& cd $ENV{CRUNCH_TMP} ";
658     if ($build_script)
659     {
660       $build_script_to_send = $build_script;
661       $command .=
662           "&& perl -";
663     }
664     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
665     if ($docker_hash)
666     {
667       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
668       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
669       # Dynamically configure the container to use the host system as its
670       # DNS server.  Get the host's global addresses from the ip command,
671       # and turn them into docker --dns options using gawk.
672       $command .=
673           q{$(ip -o address show scope global |
674               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
675       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
676       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
677       $command .= "--env=\QHOME=/home/crunch\E ";
678       while (my ($env_key, $env_val) = each %ENV)
679       {
680         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
681           if ($env_key eq "TASK_WORK") {
682             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
683           }
684           elsif ($env_key eq "TASK_KEEPMOUNT") {
685             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
686           }
687           else {
688             $command .= "--env=\Q$env_key=$env_val\E ";
689           }
690         }
691       }
692       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
693       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
694       $command .= "\Q$docker_hash\E ";
695       $command .= "stdbuf --output=0 --error=0 ";
696       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
697     } else {
698       # Non-docker run
699       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
700       $command .= "stdbuf --output=0 --error=0 ";
701       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
702     }
703
704     my @execargs = ('bash', '-c', $command);
705     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
706     # exec() failed, we assume nothing happened.
707     Log(undef, "srun() failed on build script");
708     die;
709   }
710   close("writer");
711   if (!defined $childpid)
712   {
713     close $reader{$id};
714     delete $reader{$id};
715     next;
716   }
717   shift @freeslot;
718   $proc{$childpid} = { jobstep => $id,
719                        time => time,
720                        slot => $childslot,
721                        jobstepname => "$job_id.$id.$childpid",
722                      };
723   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
724   $slot[$childslot]->{pid} = $childpid;
725
726   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
727   Log ($id, "child $childpid started on $childslotname");
728   $Jobstep->{starttime} = time;
729   $Jobstep->{node} = $childnode->{name};
730   $Jobstep->{slotindex} = $childslot;
731   delete $Jobstep->{stderr};
732   delete $Jobstep->{finishtime};
733
734   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
735   $Jobstep->{'arvados_task'}->save;
736
737   splice @jobstep_todo, $todo_ptr, 1;
738   --$todo_ptr;
739
740   $progress_is_dirty = 1;
741
742   while (!@freeslot
743          ||
744          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
745   {
746     last THISROUND if $main::please_freeze;
747     if ($main::please_info)
748     {
749       $main::please_info = 0;
750       freeze();
751       collate_output();
752       save_meta(1);
753       update_progress_stats();
754     }
755     my $gotsome
756         = readfrompipes ()
757         + reapchildren ();
758     if (!$gotsome)
759     {
760       check_refresh_wanted();
761       check_squeue();
762       update_progress_stats();
763       select (undef, undef, undef, 0.1);
764     }
765     elsif (time - $progress_stats_updated >= 30)
766     {
767       update_progress_stats();
768     }
769     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
770         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
771     {
772       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
773           .($thisround_failed+$thisround_succeeded)
774           .") -- giving up on this round";
775       Log (undef, $message);
776       last THISROUND;
777     }
778
779     # move slots from freeslot to holdslot (or back to freeslot) if necessary
780     for (my $i=$#freeslot; $i>=0; $i--) {
781       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
782         push @holdslot, (splice @freeslot, $i, 1);
783       }
784     }
785     for (my $i=$#holdslot; $i>=0; $i--) {
786       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
787         push @freeslot, (splice @holdslot, $i, 1);
788       }
789     }
790
791     # give up if no nodes are succeeding
792     if (!grep { $_->{node}->{losing_streak} == 0 &&
793                     $_->{node}->{hold_count} < 4 } @slot) {
794       my $message = "Every node has failed -- giving up on this round";
795       Log (undef, $message);
796       last THISROUND;
797     }
798   }
799 }
800
801
802 push @freeslot, splice @holdslot;
803 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
804
805
806 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
807 while (%proc)
808 {
809   if ($main::please_continue) {
810     $main::please_continue = 0;
811     goto THISROUND;
812   }
813   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
814   readfrompipes ();
815   if (!reapchildren())
816   {
817     check_refresh_wanted();
818     check_squeue();
819     update_progress_stats();
820     select (undef, undef, undef, 0.1);
821     killem (keys %proc) if $main::please_freeze;
822   }
823 }
824
825 update_progress_stats();
826 freeze_if_want_freeze();
827
828
829 if (!defined $main::success)
830 {
831   if (@jobstep_todo &&
832       $thisround_succeeded == 0 &&
833       ($thisround_failed == 0 || $thisround_failed > 4))
834   {
835     my $message = "stop because $thisround_failed tasks failed and none succeeded";
836     Log (undef, $message);
837     $main::success = 0;
838   }
839   if (!@jobstep_todo)
840   {
841     $main::success = 1;
842   }
843 }
844
845 goto ONELEVEL if !defined $main::success;
846
847
848 release_allocation();
849 freeze();
850 my $collated_output = &collate_output();
851
852 if (!$collated_output) {
853   Log(undef, "output undef");
854 }
855 else {
856   eval {
857     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
858         or die "failed to get collated manifest: $!";
859     my $orig_manifest_text = '';
860     while (my $manifest_line = <$orig_manifest>) {
861       $orig_manifest_text .= $manifest_line;
862     }
863     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
864       'manifest_text' => $orig_manifest_text,
865     });
866     Log(undef, "output uuid " . $output->{uuid});
867     Log(undef, "output hash " . $output->{portable_data_hash});
868     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
869   };
870   if ($@) {
871     Log (undef, "Failed to register output manifest: $@");
872   }
873 }
874
875 Log (undef, "finish");
876
877 save_meta();
878
879 if ($job_has_uuid) {
880   if ($collated_output && $main::success) {
881     $Job->update_attributes('state' => 'Complete')
882   } else {
883     $Job->update_attributes('state' => 'Failed')
884   }
885 }
886
887 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
888
889
890
891 sub update_progress_stats
892 {
893   $progress_stats_updated = time;
894   return if !$progress_is_dirty;
895   my ($todo, $done, $running) = (scalar @jobstep_todo,
896                                  scalar @jobstep_done,
897                                  scalar @slot - scalar @freeslot - scalar @holdslot);
898   $Job->{'tasks_summary'} ||= {};
899   $Job->{'tasks_summary'}->{'todo'} = $todo;
900   $Job->{'tasks_summary'}->{'done'} = $done;
901   $Job->{'tasks_summary'}->{'running'} = $running;
902   if ($job_has_uuid) {
903     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
904   }
905   Log (undef, "status: $done done, $running running, $todo todo");
906   $progress_is_dirty = 0;
907 }
908
909
910
911 sub reapchildren
912 {
913   my $pid = waitpid (-1, WNOHANG);
914   return 0 if $pid <= 0;
915
916   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
917                   . "."
918                   . $slot[$proc{$pid}->{slot}]->{cpu});
919   my $jobstepid = $proc{$pid}->{jobstep};
920   my $elapsed = time - $proc{$pid}->{time};
921   my $Jobstep = $jobstep[$jobstepid];
922
923   my $childstatus = $?;
924   my $exitvalue = $childstatus >> 8;
925   my $exitinfo = sprintf("exit %d signal %d%s",
926                          $exitvalue,
927                          $childstatus & 127,
928                          ($childstatus & 128 ? ' core dump' : ''));
929   $Jobstep->{'arvados_task'}->reload;
930   my $task_success = $Jobstep->{'arvados_task'}->{success};
931
932   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
933
934   if (!defined $task_success) {
935     # task did not indicate one way or the other --> fail
936     $Jobstep->{'arvados_task'}->{success} = 0;
937     $Jobstep->{'arvados_task'}->save;
938     $task_success = 0;
939   }
940
941   if (!$task_success)
942   {
943     my $temporary_fail;
944     $temporary_fail ||= $Jobstep->{node_fail};
945     $temporary_fail ||= ($exitvalue == 111);
946
947     ++$thisround_failed;
948     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
949
950     # Check for signs of a failed or misconfigured node
951     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
952         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
953       # Don't count this against jobstep failure thresholds if this
954       # node is already suspected faulty and srun exited quickly
955       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
956           $elapsed < 5) {
957         Log ($jobstepid, "blaming failure on suspect node " .
958              $slot[$proc{$pid}->{slot}]->{node}->{name});
959         $temporary_fail ||= 1;
960       }
961       ban_node_by_slot($proc{$pid}->{slot});
962     }
963
964     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
965                              ++$Jobstep->{'failures'},
966                              $temporary_fail ? 'temporary ' : 'permanent',
967                              $elapsed));
968
969     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
970       # Give up on this task, and the whole job
971       $main::success = 0;
972       $main::please_freeze = 1;
973     }
974     else {
975       # Put this task back on the todo queue
976       push @jobstep_todo, $jobstepid;
977     }
978     $Job->{'tasks_summary'}->{'failed'}++;
979   }
980   else
981   {
982     ++$thisround_succeeded;
983     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
984     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
985     push @jobstep_done, $jobstepid;
986     Log ($jobstepid, "success in $elapsed seconds");
987   }
988   $Jobstep->{exitcode} = $childstatus;
989   $Jobstep->{finishtime} = time;
990   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
991   $Jobstep->{'arvados_task'}->save;
992   process_stderr ($jobstepid, $task_success);
993   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
994
995   close $reader{$jobstepid};
996   delete $reader{$jobstepid};
997   delete $slot[$proc{$pid}->{slot}]->{pid};
998   push @freeslot, $proc{$pid}->{slot};
999   delete $proc{$pid};
1000
1001   if ($task_success) {
1002     # Load new tasks
1003     my $newtask_list = [];
1004     my $newtask_results;
1005     do {
1006       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1007         'where' => {
1008           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1009         },
1010         'order' => 'qsequence',
1011         'offset' => scalar(@$newtask_list),
1012       );
1013       push(@$newtask_list, @{$newtask_results->{items}});
1014     } while (@{$newtask_results->{items}});
1015     foreach my $arvados_task (@$newtask_list) {
1016       my $jobstep = {
1017         'level' => $arvados_task->{'sequence'},
1018         'failures' => 0,
1019         'arvados_task' => $arvados_task
1020       };
1021       push @jobstep, $jobstep;
1022       push @jobstep_todo, $#jobstep;
1023     }
1024   }
1025
1026   $progress_is_dirty = 1;
1027   1;
1028 }
1029
1030 sub check_refresh_wanted
1031 {
1032   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1033   if (@stat && $stat[9] > $latest_refresh) {
1034     $latest_refresh = scalar time;
1035     if ($job_has_uuid) {
1036       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1037       for my $attr ('cancelled_at',
1038                     'cancelled_by_user_uuid',
1039                     'cancelled_by_client_uuid',
1040                     'state') {
1041         $Job->{$attr} = $Job2->{$attr};
1042       }
1043       if ($Job->{'state'} ne "Running") {
1044         if ($Job->{'state'} eq "Cancelled") {
1045           Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1046         } else {
1047           Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1048         }
1049         $main::success = 0;
1050         $main::please_freeze = 1;
1051       }
1052     }
1053   }
1054 }
1055
1056 sub check_squeue
1057 {
1058   # return if the kill list was checked <4 seconds ago
1059   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1060   {
1061     return;
1062   }
1063   $squeue_kill_checked = time;
1064
1065   # use killem() on procs whose killtime is reached
1066   for (keys %proc)
1067   {
1068     if (exists $proc{$_}->{killtime}
1069         && $proc{$_}->{killtime} <= time)
1070     {
1071       killem ($_);
1072     }
1073   }
1074
1075   # return if the squeue was checked <60 seconds ago
1076   if (defined $squeue_checked && $squeue_checked > time - 60)
1077   {
1078     return;
1079   }
1080   $squeue_checked = time;
1081
1082   if (!$have_slurm)
1083   {
1084     # here is an opportunity to check for mysterious problems with local procs
1085     return;
1086   }
1087
1088   # get a list of steps still running
1089   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1090   chop @squeue;
1091   if ($squeue[-1] ne "ok")
1092   {
1093     return;
1094   }
1095   pop @squeue;
1096
1097   # which of my jobsteps are running, according to squeue?
1098   my %ok;
1099   foreach (@squeue)
1100   {
1101     if (/^(\d+)\.(\d+) (\S+)/)
1102     {
1103       if ($1 eq $ENV{SLURM_JOBID})
1104       {
1105         $ok{$3} = 1;
1106       }
1107     }
1108   }
1109
1110   # which of my active child procs (>60s old) were not mentioned by squeue?
1111   foreach (keys %proc)
1112   {
1113     if ($proc{$_}->{time} < time - 60
1114         && !exists $ok{$proc{$_}->{jobstepname}}
1115         && !exists $proc{$_}->{killtime})
1116     {
1117       # kill this proc if it hasn't exited in 30 seconds
1118       $proc{$_}->{killtime} = time + 30;
1119     }
1120   }
1121 }
1122
1123
1124 sub release_allocation
1125 {
1126   if ($have_slurm)
1127   {
1128     Log (undef, "release job allocation");
1129     system "scancel $ENV{SLURM_JOBID}";
1130   }
1131 }
1132
1133
1134 sub readfrompipes
1135 {
1136   my $gotsome = 0;
1137   foreach my $job (keys %reader)
1138   {
1139     my $buf;
1140     while (0 < sysread ($reader{$job}, $buf, 8192))
1141     {
1142       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1143       $jobstep[$job]->{stderr} .= $buf;
1144       preprocess_stderr ($job);
1145       if (length ($jobstep[$job]->{stderr}) > 16384)
1146       {
1147         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1148       }
1149       $gotsome = 1;
1150     }
1151   }
1152   return $gotsome;
1153 }
1154
1155
1156 sub preprocess_stderr
1157 {
1158   my $job = shift;
1159
1160   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1161     my $line = $1;
1162     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1163     Log ($job, "stderr $line");
1164     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1165       # whoa.
1166       $main::please_freeze = 1;
1167     }
1168     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1169       $jobstep[$job]->{node_fail} = 1;
1170       ban_node_by_slot($jobstep[$job]->{slotindex});
1171     }
1172   }
1173 }
1174
1175
1176 sub process_stderr
1177 {
1178   my $job = shift;
1179   my $task_success = shift;
1180   preprocess_stderr ($job);
1181
1182   map {
1183     Log ($job, "stderr $_");
1184   } split ("\n", $jobstep[$job]->{stderr});
1185 }
1186
1187 sub fetch_block
1188 {
1189   my $hash = shift;
1190   my ($keep, $child_out, $output_block);
1191
1192   my $cmd = "arv-get \Q$hash\E";
1193   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1194   $output_block = '';
1195   while (1) {
1196     my $buf;
1197     my $bytes = sysread($keep, $buf, 1024 * 1024);
1198     if (!defined $bytes) {
1199       die "reading from arv-get: $!";
1200     } elsif ($bytes == 0) {
1201       # sysread returns 0 at the end of the pipe.
1202       last;
1203     } else {
1204       # some bytes were read into buf.
1205       $output_block .= $buf;
1206     }
1207   }
1208   close $keep;
1209   return $output_block;
1210 }
1211
1212 sub collate_output
1213 {
1214   Log (undef, "collate");
1215
1216   my ($child_out, $child_in);
1217   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1218                   '--retries', put_retry_count());
1219   my $joboutput;
1220   for (@jobstep)
1221   {
1222     next if (!exists $_->{'arvados_task'}->{'output'} ||
1223              !$_->{'arvados_task'}->{'success'});
1224     my $output = $_->{'arvados_task'}->{output};
1225     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1226     {
1227       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1228       print $child_in $output;
1229     }
1230     elsif (@jobstep == 1)
1231     {
1232       $joboutput = $output;
1233       last;
1234     }
1235     elsif (defined (my $outblock = fetch_block ($output)))
1236     {
1237       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1238       print $child_in $outblock;
1239     }
1240     else
1241     {
1242       Log (undef, "XXX fetch_block($output) failed XXX");
1243       $main::success = 0;
1244     }
1245   }
1246   $child_in->close;
1247
1248   if (!defined $joboutput) {
1249     my $s = IO::Select->new($child_out);
1250     if ($s->can_read(120)) {
1251       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1252       chomp($joboutput);
1253     } else {
1254       Log (undef, "timed out reading from 'arv-put'");
1255     }
1256   }
1257   waitpid($pid, 0);
1258
1259   return $joboutput;
1260 }
1261
1262
1263 sub killem
1264 {
1265   foreach (@_)
1266   {
1267     my $sig = 2;                # SIGINT first
1268     if (exists $proc{$_}->{"sent_$sig"} &&
1269         time - $proc{$_}->{"sent_$sig"} > 4)
1270     {
1271       $sig = 15;                # SIGTERM if SIGINT doesn't work
1272     }
1273     if (exists $proc{$_}->{"sent_$sig"} &&
1274         time - $proc{$_}->{"sent_$sig"} > 4)
1275     {
1276       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1277     }
1278     if (!exists $proc{$_}->{"sent_$sig"})
1279     {
1280       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1281       kill $sig, $_;
1282       select (undef, undef, undef, 0.1);
1283       if ($sig == 2)
1284       {
1285         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1286       }
1287       $proc{$_}->{"sent_$sig"} = time;
1288       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1289     }
1290   }
1291 }
1292
1293
1294 sub fhbits
1295 {
1296   my($bits);
1297   for (@_) {
1298     vec($bits,fileno($_),1) = 1;
1299   }
1300   $bits;
1301 }
1302
1303
1304 # Output logging to arv-put.
1305 #
1306 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1307 # $log_pipe_pid is the pid of the arv-put subprocess.
1308 #
1309 # The only functions that should access these variables directly are:
1310 #
1311 # start_output_log($logfilename)
1312 #     Starts an arv-put pipe, reading data on stdin and writing it to
1313 #     a $logfilename file in an output collection.
1314 #
1315 # write_output_log($txt)
1316 #     Writes $txt to the output log collection.
1317 #
1318 # finish_output_log()
1319 #     Closes the arv-put pipe and returns the output that it produces.
1320 #
1321 # output_log_is_active()
1322 #     Returns a true value if there is currently a live arv-put
1323 #     process, false otherwise.
1324 #
1325 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1326
1327 sub start_output_log($)
1328 {
1329   my $logfilename = shift;
1330   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1331                         'arv-put', '--portable-data-hash',
1332                         '--retries', '3',
1333                         '--filename', $logfilename,
1334                         '-');
1335 }
1336
1337 sub write_output_log($)
1338 {
1339   my $txt = shift;
1340   print $log_pipe_in $txt;
1341 }
1342
1343 sub finish_output_log()
1344 {
1345   return unless $log_pipe_pid;
1346
1347   close($log_pipe_in);
1348   my $arv_put_output;
1349
1350   my $s = IO::Select->new($log_pipe_out);
1351   if ($s->can_read(120)) {
1352     sysread($log_pipe_out, $arv_put_output, 1024);
1353     chomp($arv_put_output);
1354   } else {
1355     Log (undef, "timed out reading from 'arv-put'");
1356   }
1357
1358   waitpid($log_pipe_pid, 0);
1359   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1360   if ($?) {
1361     Log("finish_output_log: arv-put returned error $?")
1362   }
1363
1364   return $arv_put_output;
1365 }
1366
1367 sub output_log_is_active() {
1368   return $log_pipe_pid
1369 }
1370
1371 sub Log                         # ($jobstep_id, $logmessage)
1372 {
1373   if ($_[1] =~ /\n/) {
1374     for my $line (split (/\n/, $_[1])) {
1375       Log ($_[0], $line);
1376     }
1377     return;
1378   }
1379   my $fh = select STDERR; $|=1; select $fh;
1380   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1381   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1382   $message .= "\n";
1383   my $datetime;
1384   if (output_log_is_active() || -t STDERR) {
1385     my @gmtime = gmtime;
1386     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1387                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1388   }
1389   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1390
1391   if (output_log_is_active()) {
1392     write_output_log($datetime . " " . $message);
1393   }
1394 }
1395
1396
1397 sub croak
1398 {
1399   my ($package, $file, $line) = caller;
1400   my $message = "@_ at $file line $line\n";
1401   Log (undef, $message);
1402   freeze() if @jobstep_todo;
1403   collate_output() if @jobstep_todo;
1404   cleanup();
1405   save_meta() if output_log_is_active();
1406   die;
1407 }
1408
1409
1410 sub cleanup
1411 {
1412   return if !$job_has_uuid;
1413   if ($Job->{'state'} eq 'Cancelled') {
1414     $Job->update_attributes('finished_at' => scalar gmtime);
1415   } else {
1416     $Job->update_attributes('state' => 'Failed');
1417   }
1418 }
1419
1420
1421 sub save_meta
1422 {
1423   my $justcheckpoint = shift; # false if this will be the last meta saved
1424   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1425
1426   my $loglocator = finish_output_log();
1427   Log (undef, "log manifest is $loglocator");
1428   $Job->{'log'} = $loglocator;
1429   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1430 }
1431
1432
1433 sub freeze_if_want_freeze
1434 {
1435   if ($main::please_freeze)
1436   {
1437     release_allocation();
1438     if (@_)
1439     {
1440       # kill some srun procs before freeze+stop
1441       map { $proc{$_} = {} } @_;
1442       while (%proc)
1443       {
1444         killem (keys %proc);
1445         select (undef, undef, undef, 0.1);
1446         my $died;
1447         while (($died = waitpid (-1, WNOHANG)) > 0)
1448         {
1449           delete $proc{$died};
1450         }
1451       }
1452     }
1453     freeze();
1454     collate_output();
1455     cleanup();
1456     save_meta();
1457     exit 0;
1458   }
1459 }
1460
1461
1462 sub freeze
1463 {
1464   Log (undef, "Freeze not implemented");
1465   return;
1466 }
1467
1468
1469 sub thaw
1470 {
1471   croak ("Thaw not implemented");
1472 }
1473
1474
1475 sub freezequote
1476 {
1477   my $s = shift;
1478   $s =~ s/\\/\\\\/g;
1479   $s =~ s/\n/\\n/g;
1480   return $s;
1481 }
1482
1483
1484 sub freezeunquote
1485 {
1486   my $s = shift;
1487   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1488   return $s;
1489 }
1490
1491
1492 sub srun
1493 {
1494   my $srunargs = shift;
1495   my $execargs = shift;
1496   my $opts = shift || {};
1497   my $stdin = shift;
1498   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1499   print STDERR (join (" ",
1500                       map { / / ? "'$_'" : $_ }
1501                       (@$args)),
1502                 "\n")
1503       if $ENV{CRUNCH_DEBUG};
1504
1505   if (defined $stdin) {
1506     my $child = open STDIN, "-|";
1507     defined $child or die "no fork: $!";
1508     if ($child == 0) {
1509       print $stdin or die $!;
1510       close STDOUT or die $!;
1511       exit 0;
1512     }
1513   }
1514
1515   return system (@$args) if $opts->{fork};
1516
1517   exec @$args;
1518   warn "ENV size is ".length(join(" ",%ENV));
1519   die "exec failed: $!: @$args";
1520 }
1521
1522
1523 sub ban_node_by_slot {
1524   # Don't start any new jobsteps on this node for 60 seconds
1525   my $slotid = shift;
1526   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1527   $slot[$slotid]->{node}->{hold_count}++;
1528   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1529 }
1530
1531 sub must_lock_now
1532 {
1533   my ($lockfile, $error_message) = @_;
1534   open L, ">", $lockfile or croak("$lockfile: $!");
1535   if (!flock L, LOCK_EX|LOCK_NB) {
1536     croak("Can't lock $lockfile: $error_message\n");
1537   }
1538 }
1539
1540 sub find_docker_image {
1541   # Given a Keep locator, check to see if it contains a Docker image.
1542   # If so, return its stream name and Docker hash.
1543   # If not, return undef for both values.
1544   my $locator = shift;
1545   my ($streamname, $filename);
1546   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1547     foreach my $line (split(/\n/, $image->{manifest_text})) {
1548       my @tokens = split(/\s+/, $line);
1549       next if (!@tokens);
1550       $streamname = shift(@tokens);
1551       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1552         if (defined($filename)) {
1553           return (undef, undef);  # More than one file in the Collection.
1554         } else {
1555           $filename = (split(/:/, $filedata, 3))[2];
1556         }
1557       }
1558     }
1559   }
1560   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1561     return ($streamname, $1);
1562   } else {
1563     return (undef, undef);
1564   }
1565 }
1566
1567 sub put_retry_count {
1568   # Calculate a --retries argument for arv-put that will have it try
1569   # approximately as long as this Job has been running.
1570   my $stoptime = shift || time;
1571   my $starttime = $jobstep[0]->{starttime};
1572   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1573   my $retries = 0;
1574   while ($timediff >= 2) {
1575     $retries++;
1576     $timediff /= 2;
1577   }
1578   return ($retries > 3) ? $retries : 3;
1579 }
1580
1581 __DATA__
1582 #!/usr/bin/perl
1583
1584 # checkout-and-build
1585
1586 use Fcntl ':flock';
1587 use File::Path qw( make_path );
1588
1589 my $destdir = $ENV{"CRUNCH_SRC"};
1590 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1591 my $repo = $ENV{"CRUNCH_SRC_URL"};
1592 my $task_work = $ENV{"TASK_WORK"};
1593
1594 for my $dir ($destdir, $task_work) {
1595     if ($dir) {
1596         make_path $dir;
1597         -e $dir or die "Failed to create temporary directory ($dir): $!";
1598     }
1599 }
1600
1601 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1602 flock L, LOCK_EX;
1603 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1604     if (@ARGV) {
1605         exec(@ARGV);
1606         die "Cannot exec `@ARGV`: $!";
1607     } else {
1608         exit 0;
1609     }
1610 }
1611
1612 unlink "$destdir.commit";
1613 open STDOUT, ">", "$destdir.log";
1614 open STDERR, ">&STDOUT";
1615
1616 mkdir $destdir;
1617 my @git_archive_data = <DATA>;
1618 if (@git_archive_data) {
1619   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1620   print TARX @git_archive_data;
1621   if(!close(TARX)) {
1622     die "'tar -C $destdir -xf -' exited $?: $!";
1623   }
1624 }
1625
1626 my $pwd;
1627 chomp ($pwd = `pwd`);
1628 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1629 mkdir $install_dir;
1630
1631 for my $src_path ("$destdir/arvados/sdk/python") {
1632   if (-d $src_path) {
1633     shell_or_die ("virtualenv", $install_dir);
1634     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1635   }
1636 }
1637
1638 if (-e "$destdir/crunch_scripts/install") {
1639     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1640 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1641     # Old version
1642     shell_or_die ("./tests/autotests.sh", $install_dir);
1643 } elsif (-e "./install.sh") {
1644     shell_or_die ("./install.sh", $install_dir);
1645 }
1646
1647 if ($commit) {
1648     unlink "$destdir.commit.new";
1649     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1650     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1651 }
1652
1653 close L;
1654
1655 if (@ARGV) {
1656     exec(@ARGV);
1657     die "Cannot exec `@ARGV`: $!";
1658 } else {
1659     exit 0;
1660 }
1661
1662 sub shell_or_die
1663 {
1664   if ($ENV{"DEBUG"}) {
1665     print STDERR "@_\n";
1666   }
1667   system (@_) == 0
1668       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1669 }
1670
1671 __DATA__