Merge branch '3782-stub-io-pipe' refs #3782
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
130
131
132 $SIG{'USR1'} = sub
133 {
134   $main::ENV{CRUNCH_DEBUG} = 1;
135 };
136 $SIG{'USR2'} = sub
137 {
138   $main::ENV{CRUNCH_DEBUG} = 0;
139 };
140
141
142
143 my $arv = Arvados->new('apiVersion' => 'v1');
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # Claim this job, and make sure nobody else does
156     eval {
157       # lock() sets is_locked_by_uuid and changes state to Running.
158       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
159     };
160     if ($@) {
161       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
162       exit EX_TEMPFAIL;
163     };
164   }
165 }
166 else
167 {
168   $Job = JSON::decode_json($jobspec);
169
170   if (!$resume_stash)
171   {
172     map { croak ("No $_ specified") unless $Job->{$_} }
173     qw(script script_version script_parameters);
174   }
175
176   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
177   $Job->{'started_at'} = gmtime;
178
179   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
180
181   $job_has_uuid = 1;
182 }
183 $job_id = $Job->{'uuid'};
184
185 my $keep_logfile = $job_id . '.log.txt';
186 log_writer_start($keep_logfile);
187
188 $Job->{'runtime_constraints'} ||= {};
189 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
190 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
191
192
193 Log (undef, "check slurm allocation");
194 my @slot;
195 my @node;
196 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
197 my @sinfo;
198 if (!$have_slurm)
199 {
200   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
201   push @sinfo, "$localcpus localhost";
202 }
203 if (exists $ENV{SLURM_NODELIST})
204 {
205   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
206 }
207 foreach (@sinfo)
208 {
209   my ($ncpus, $slurm_nodelist) = split;
210   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
211
212   my @nodelist;
213   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
214   {
215     my $nodelist = $1;
216     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
217     {
218       my $ranges = $1;
219       foreach (split (",", $ranges))
220       {
221         my ($a, $b);
222         if (/(\d+)-(\d+)/)
223         {
224           $a = $1;
225           $b = $2;
226         }
227         else
228         {
229           $a = $_;
230           $b = $_;
231         }
232         push @nodelist, map {
233           my $n = $nodelist;
234           $n =~ s/\[[-,\d]+\]/$_/;
235           $n;
236         } ($a..$b);
237       }
238     }
239     else
240     {
241       push @nodelist, $nodelist;
242     }
243   }
244   foreach my $nodename (@nodelist)
245   {
246     Log (undef, "node $nodename - $ncpus slots");
247     my $node = { name => $nodename,
248                  ncpus => $ncpus,
249                  losing_streak => 0,
250                  hold_until => 0 };
251     foreach my $cpu (1..$ncpus)
252     {
253       push @slot, { node => $node,
254                     cpu => $cpu };
255     }
256   }
257   push @node, @nodelist;
258 }
259
260
261
262 # Ensure that we get one jobstep running on each allocated node before
263 # we start overloading nodes with concurrent steps
264
265 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
266
267
268 $Job->update_attributes(
269   'tasks_summary' => { 'failed' => 0,
270                        'todo' => 1,
271                        'running' => 0,
272                        'done' => 0 });
273
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
288
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
292
293
294 my @jobstep;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
299 my $squeue_checked;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
303
304
305
306 if (defined $Job->{thawedfromkey})
307 {
308   thaw ($Job->{thawedfromkey});
309 }
310 else
311 {
312   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313     'job_uuid' => $Job->{'uuid'},
314     'sequence' => 0,
315     'qsequence' => 0,
316     'parameters' => {},
317                                                           });
318   push @jobstep, { 'level' => 0,
319                    'failures' => 0,
320                    'arvados_task' => $first_task,
321                  };
322   push @jobstep_todo, 0;
323 }
324
325
326 if (!$have_slurm)
327 {
328   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
329 }
330
331
332 my $build_script;
333
334
335 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
336
337 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
338 if ($skip_install)
339 {
340   if (!defined $no_clear_tmp) {
341     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
342     system($clear_tmp_cmd) == 0
343         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
344   }
345   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
346   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
347     if (-d $src_path) {
348       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
349           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
350       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
351           == 0
352           or croak ("setup.py in $src_path failed: exit ".($?>>8));
353     }
354   }
355 }
356 else
357 {
358   do {
359     local $/ = undef;
360     $build_script = <DATA>;
361   };
362   Log (undef, "Install revision ".$Job->{script_version});
363   my $nodelist = join(",", @node);
364
365   if (!defined $no_clear_tmp) {
366     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
367
368     my $cleanpid = fork();
369     if ($cleanpid == 0)
370     {
371       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
372             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
373       exit (1);
374     }
375     while (1)
376     {
377       last if $cleanpid == waitpid (-1, WNOHANG);
378       freeze_if_want_freeze ($cleanpid);
379       select (undef, undef, undef, 0.1);
380     }
381     Log (undef, "Clean-work-dir exited $?");
382   }
383
384   # Install requested code version
385
386   my @execargs;
387   my @srunargs = ("srun",
388                   "--nodelist=$nodelist",
389                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
390
391   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
392   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
393
394   my $commit;
395   my $git_archive;
396   my $treeish = $Job->{'script_version'};
397
398   # If we're running under crunch-dispatch, it will have pulled the
399   # appropriate source tree into its own repository, and given us that
400   # repo's path as $git_dir. If we're running a "local" job, and a
401   # script_version was specified, it's up to the user to provide the
402   # full path to a local repository in Job->{repository}.
403   #
404   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
405   # git-archive --remote where appropriate.
406   #
407   # TODO: Accept a locally-hosted Arvados repository by name or
408   # UUID. Use arvados.v1.repositories.list or .get to figure out the
409   # appropriate fetch-url.
410   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
411
412   $ENV{"CRUNCH_SRC_URL"} = $repo;
413
414   if (-d "$repo/.git") {
415     # We were given a working directory, but we are only interested in
416     # the index.
417     $repo = "$repo/.git";
418   }
419
420   # If this looks like a subversion r#, look for it in git-svn commit messages
421
422   if ($treeish =~ m{^\d{1,4}$}) {
423     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
424     chomp $gitlog;
425     Log(undef, "git Subversion search exited $?");
426     if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
427       $commit = $gitlog;
428       Log(undef, "Using commit $commit for Subversion revision $treeish");
429     }
430   }
431
432   # If that didn't work, try asking git to look it up as a tree-ish.
433
434   if (!defined $commit) {
435     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
436     chomp $found;
437     Log(undef, "git rev-list exited $? with result '$found'");
438     if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
439       $commit = $found;
440       Log(undef, "Using commit $commit for tree-ish $treeish");
441       if ($commit ne $treeish) {
442         # Make sure we record the real commit id in the database,
443         # frozentokey, logs, etc. -- instead of an abbreviation or a
444         # branch name which can become ambiguous or point to a
445         # different commit in the future.
446         $Job->{'script_version'} = $commit;
447         !$job_has_uuid or
448             $Job->update_attributes('script_version' => $commit) or
449             croak("Error while updating job");
450       }
451     }
452   }
453
454   if (defined $commit) {
455     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
456     @execargs = ("sh", "-c",
457                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
458     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
459     croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
460   }
461   else {
462     croak ("could not figure out commit id for $treeish");
463   }
464
465   # Note: this section is almost certainly unnecessary if we're
466   # running tasks in docker containers.
467   my $installpid = fork();
468   if ($installpid == 0)
469   {
470     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
471     exit (1);
472   }
473   while (1)
474   {
475     last if $installpid == waitpid (-1, WNOHANG);
476     freeze_if_want_freeze ($installpid);
477     select (undef, undef, undef, 0.1);
478   }
479   Log (undef, "Install exited $?");
480 }
481
482 if (!$have_slurm)
483 {
484   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
485   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
486 }
487
488 # If this job requires a Docker image, install that.
489 my $docker_bin = "/usr/bin/docker.io";
490 my ($docker_locator, $docker_stream, $docker_hash);
491 if ($docker_locator = $Job->{docker_image_locator}) {
492   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
493   if (!$docker_hash)
494   {
495     croak("No Docker image hash found from locator $docker_locator");
496   }
497   $docker_stream =~ s/^\.//;
498   my $docker_install_script = qq{
499 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
500     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
501 fi
502 };
503   my $docker_pid = fork();
504   if ($docker_pid == 0)
505   {
506     srun (["srun", "--nodelist=" . join(',', @node)],
507           ["/bin/sh", "-ec", $docker_install_script]);
508     exit ($?);
509   }
510   while (1)
511   {
512     last if $docker_pid == waitpid (-1, WNOHANG);
513     freeze_if_want_freeze ($docker_pid);
514     select (undef, undef, undef, 0.1);
515   }
516   if ($? != 0)
517   {
518     croak("Installing Docker image from $docker_locator returned exit code $?");
519   }
520 }
521
522 foreach (qw (script script_version script_parameters runtime_constraints))
523 {
524   Log (undef,
525        "$_ " .
526        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
527 }
528 foreach (split (/\n/, $Job->{knobs}))
529 {
530   Log (undef, "knob " . $_);
531 }
532
533
534
535 $main::success = undef;
536
537
538
539 ONELEVEL:
540
541 my $thisround_succeeded = 0;
542 my $thisround_failed = 0;
543 my $thisround_failed_multiple = 0;
544
545 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
546                        or $a <=> $b } @jobstep_todo;
547 my $level = $jobstep[$jobstep_todo[0]]->{level};
548 Log (undef, "start level $level");
549
550
551
552 my %proc;
553 my @freeslot = (0..$#slot);
554 my @holdslot;
555 my %reader;
556 my $progress_is_dirty = 1;
557 my $progress_stats_updated = 0;
558
559 update_progress_stats();
560
561
562
563 THISROUND:
564 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
565 {
566   my $id = $jobstep_todo[$todo_ptr];
567   my $Jobstep = $jobstep[$id];
568   if ($Jobstep->{level} != $level)
569   {
570     next;
571   }
572
573   pipe $reader{$id}, "writer" or croak ($!);
574   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
575   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
576
577   my $childslot = $freeslot[0];
578   my $childnode = $slot[$childslot]->{node};
579   my $childslotname = join (".",
580                             $slot[$childslot]->{node}->{name},
581                             $slot[$childslot]->{cpu});
582   my $childpid = fork();
583   if ($childpid == 0)
584   {
585     $SIG{'INT'} = 'DEFAULT';
586     $SIG{'QUIT'} = 'DEFAULT';
587     $SIG{'TERM'} = 'DEFAULT';
588
589     foreach (values (%reader))
590     {
591       close($_);
592     }
593     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
594     open(STDOUT,">&writer");
595     open(STDERR,">&writer");
596
597     undef $dbh;
598     undef $sth;
599
600     delete $ENV{"GNUPGHOME"};
601     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
602     $ENV{"TASK_QSEQUENCE"} = $id;
603     $ENV{"TASK_SEQUENCE"} = $level;
604     $ENV{"JOB_SCRIPT"} = $Job->{script};
605     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
606       $param =~ tr/a-z/A-Z/;
607       $ENV{"JOB_PARAMETER_$param"} = $value;
608     }
609     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
610     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
611     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
612     $ENV{"HOME"} = $ENV{"TASK_WORK"};
613     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
614     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
615     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
616     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
617
618     $ENV{"GZIP"} = "-n";
619
620     my @srunargs = (
621       "srun",
622       "--nodelist=".$childnode->{name},
623       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
624       "--job-name=$job_id.$id.$$",
625         );
626     my $build_script_to_send = "";
627     my $command =
628         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
629         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
630         ."&& cd $ENV{CRUNCH_TMP} ";
631     if ($build_script)
632     {
633       $build_script_to_send = $build_script;
634       $command .=
635           "&& perl -";
636     }
637     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
638     if ($docker_hash)
639     {
640       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
641       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
642       # Dynamically configure the container to use the host system as its
643       # DNS server.  Get the host's global addresses from the ip command,
644       # and turn them into docker --dns options using gawk.
645       $command .=
646           q{$(ip -o address show scope global |
647               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
648       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
649       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
650       $command .= "--env=\QHOME=/home/crunch\E ";
651       while (my ($env_key, $env_val) = each %ENV)
652       {
653         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
654           if ($env_key eq "TASK_WORK") {
655             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
656           }
657           elsif ($env_key eq "TASK_KEEPMOUNT") {
658             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
659           }
660           else {
661             $command .= "--env=\Q$env_key=$env_val\E ";
662           }
663         }
664       }
665       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
666       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
667       $command .= "\Q$docker_hash\E ";
668       $command .= "stdbuf --output=0 --error=0 ";
669       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
670     } else {
671       # Non-docker run
672       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
673       $command .= "stdbuf --output=0 --error=0 ";
674       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
675     }
676
677     my @execargs = ('bash', '-c', $command);
678     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
679     # exec() failed, we assume nothing happened.
680     Log(undef, "srun() failed on build script");
681     die;
682   }
683   close("writer");
684   if (!defined $childpid)
685   {
686     close $reader{$id};
687     delete $reader{$id};
688     next;
689   }
690   shift @freeslot;
691   $proc{$childpid} = { jobstep => $id,
692                        time => time,
693                        slot => $childslot,
694                        jobstepname => "$job_id.$id.$childpid",
695                      };
696   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
697   $slot[$childslot]->{pid} = $childpid;
698
699   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
700   Log ($id, "child $childpid started on $childslotname");
701   $Jobstep->{starttime} = time;
702   $Jobstep->{node} = $childnode->{name};
703   $Jobstep->{slotindex} = $childslot;
704   delete $Jobstep->{stderr};
705   delete $Jobstep->{finishtime};
706
707   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
708   $Jobstep->{'arvados_task'}->save;
709
710   splice @jobstep_todo, $todo_ptr, 1;
711   --$todo_ptr;
712
713   $progress_is_dirty = 1;
714
715   while (!@freeslot
716          ||
717          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
718   {
719     last THISROUND if $main::please_freeze;
720     if ($main::please_info)
721     {
722       $main::please_info = 0;
723       freeze();
724       collate_output();
725       save_meta(1);
726       update_progress_stats();
727     }
728     my $gotsome
729         = readfrompipes ()
730         + reapchildren ();
731     if (!$gotsome)
732     {
733       check_refresh_wanted();
734       check_squeue();
735       update_progress_stats();
736       select (undef, undef, undef, 0.1);
737     }
738     elsif (time - $progress_stats_updated >= 30)
739     {
740       update_progress_stats();
741     }
742     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
743         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
744     {
745       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
746           .($thisround_failed+$thisround_succeeded)
747           .") -- giving up on this round";
748       Log (undef, $message);
749       last THISROUND;
750     }
751
752     # move slots from freeslot to holdslot (or back to freeslot) if necessary
753     for (my $i=$#freeslot; $i>=0; $i--) {
754       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
755         push @holdslot, (splice @freeslot, $i, 1);
756       }
757     }
758     for (my $i=$#holdslot; $i>=0; $i--) {
759       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
760         push @freeslot, (splice @holdslot, $i, 1);
761       }
762     }
763
764     # give up if no nodes are succeeding
765     if (!grep { $_->{node}->{losing_streak} == 0 &&
766                     $_->{node}->{hold_count} < 4 } @slot) {
767       my $message = "Every node has failed -- giving up on this round";
768       Log (undef, $message);
769       last THISROUND;
770     }
771   }
772 }
773
774
775 push @freeslot, splice @holdslot;
776 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
777
778
779 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
780 while (%proc)
781 {
782   if ($main::please_continue) {
783     $main::please_continue = 0;
784     goto THISROUND;
785   }
786   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
787   readfrompipes ();
788   if (!reapchildren())
789   {
790     check_refresh_wanted();
791     check_squeue();
792     update_progress_stats();
793     select (undef, undef, undef, 0.1);
794     killem (keys %proc) if $main::please_freeze;
795   }
796 }
797
798 update_progress_stats();
799 freeze_if_want_freeze();
800
801
802 if (!defined $main::success)
803 {
804   if (@jobstep_todo &&
805       $thisround_succeeded == 0 &&
806       ($thisround_failed == 0 || $thisround_failed > 4))
807   {
808     my $message = "stop because $thisround_failed tasks failed and none succeeded";
809     Log (undef, $message);
810     $main::success = 0;
811   }
812   if (!@jobstep_todo)
813   {
814     $main::success = 1;
815   }
816 }
817
818 goto ONELEVEL if !defined $main::success;
819
820
821 release_allocation();
822 freeze();
823 my $collated_output = &collate_output();
824
825 if (!$collated_output) {
826   Log(undef, "output undef");
827 }
828 else {
829   eval {
830     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
831         or die "failed to get collated manifest: $!";
832     my $orig_manifest_text = '';
833     while (my $manifest_line = <$orig_manifest>) {
834       $orig_manifest_text .= $manifest_line;
835     }
836     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
837       'manifest_text' => $orig_manifest_text,
838     });
839     Log(undef, "output uuid " . $output->{uuid});
840     Log(undef, "output hash " . $output->{portable_data_hash});
841     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
842   };
843   if ($@) {
844     Log (undef, "Failed to register output manifest: $@");
845   }
846 }
847
848 Log (undef, "finish");
849
850 save_meta();
851
852 if ($job_has_uuid) {
853   if ($collated_output && $main::success) {
854     $Job->update_attributes('state' => 'Complete')
855   } else {
856     $Job->update_attributes('state' => 'Failed')
857   }
858 }
859
860 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
861
862
863
864 sub update_progress_stats
865 {
866   $progress_stats_updated = time;
867   return if !$progress_is_dirty;
868   my ($todo, $done, $running) = (scalar @jobstep_todo,
869                                  scalar @jobstep_done,
870                                  scalar @slot - scalar @freeslot - scalar @holdslot);
871   $Job->{'tasks_summary'} ||= {};
872   $Job->{'tasks_summary'}->{'todo'} = $todo;
873   $Job->{'tasks_summary'}->{'done'} = $done;
874   $Job->{'tasks_summary'}->{'running'} = $running;
875   if ($job_has_uuid) {
876     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
877   }
878   Log (undef, "status: $done done, $running running, $todo todo");
879   $progress_is_dirty = 0;
880 }
881
882
883
884 sub reapchildren
885 {
886   my $pid = waitpid (-1, WNOHANG);
887   return 0 if $pid <= 0;
888
889   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
890                   . "."
891                   . $slot[$proc{$pid}->{slot}]->{cpu});
892   my $jobstepid = $proc{$pid}->{jobstep};
893   my $elapsed = time - $proc{$pid}->{time};
894   my $Jobstep = $jobstep[$jobstepid];
895
896   my $childstatus = $?;
897   my $exitvalue = $childstatus >> 8;
898   my $exitinfo = sprintf("exit %d signal %d%s",
899                          $exitvalue,
900                          $childstatus & 127,
901                          ($childstatus & 128 ? ' core dump' : ''));
902   $Jobstep->{'arvados_task'}->reload;
903   my $task_success = $Jobstep->{'arvados_task'}->{success};
904
905   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
906
907   if (!defined $task_success) {
908     # task did not indicate one way or the other --> fail
909     $Jobstep->{'arvados_task'}->{success} = 0;
910     $Jobstep->{'arvados_task'}->save;
911     $task_success = 0;
912   }
913
914   if (!$task_success)
915   {
916     my $temporary_fail;
917     $temporary_fail ||= $Jobstep->{node_fail};
918     $temporary_fail ||= ($exitvalue == 111);
919
920     ++$thisround_failed;
921     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
922
923     # Check for signs of a failed or misconfigured node
924     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
925         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
926       # Don't count this against jobstep failure thresholds if this
927       # node is already suspected faulty and srun exited quickly
928       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
929           $elapsed < 5) {
930         Log ($jobstepid, "blaming failure on suspect node " .
931              $slot[$proc{$pid}->{slot}]->{node}->{name});
932         $temporary_fail ||= 1;
933       }
934       ban_node_by_slot($proc{$pid}->{slot});
935     }
936
937     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
938                              ++$Jobstep->{'failures'},
939                              $temporary_fail ? 'temporary ' : 'permanent',
940                              $elapsed));
941
942     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
943       # Give up on this task, and the whole job
944       $main::success = 0;
945       $main::please_freeze = 1;
946     }
947     else {
948       # Put this task back on the todo queue
949       push @jobstep_todo, $jobstepid;
950     }
951     $Job->{'tasks_summary'}->{'failed'}++;
952   }
953   else
954   {
955     ++$thisround_succeeded;
956     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
957     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
958     push @jobstep_done, $jobstepid;
959     Log ($jobstepid, "success in $elapsed seconds");
960   }
961   $Jobstep->{exitcode} = $childstatus;
962   $Jobstep->{finishtime} = time;
963   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
964   $Jobstep->{'arvados_task'}->save;
965   process_stderr ($jobstepid, $task_success);
966   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
967
968   close $reader{$jobstepid};
969   delete $reader{$jobstepid};
970   delete $slot[$proc{$pid}->{slot}]->{pid};
971   push @freeslot, $proc{$pid}->{slot};
972   delete $proc{$pid};
973
974   if ($task_success) {
975     # Load new tasks
976     my $newtask_list = [];
977     my $newtask_results;
978     do {
979       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
980         'where' => {
981           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
982         },
983         'order' => 'qsequence',
984         'offset' => scalar(@$newtask_list),
985       );
986       push(@$newtask_list, @{$newtask_results->{items}});
987     } while (@{$newtask_results->{items}});
988     foreach my $arvados_task (@$newtask_list) {
989       my $jobstep = {
990         'level' => $arvados_task->{'sequence'},
991         'failures' => 0,
992         'arvados_task' => $arvados_task
993       };
994       push @jobstep, $jobstep;
995       push @jobstep_todo, $#jobstep;
996     }
997   }
998
999   $progress_is_dirty = 1;
1000   1;
1001 }
1002
1003 sub check_refresh_wanted
1004 {
1005   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1006   if (@stat && $stat[9] > $latest_refresh) {
1007     $latest_refresh = scalar time;
1008     if ($job_has_uuid) {
1009       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1010       for my $attr ('cancelled_at',
1011                     'cancelled_by_user_uuid',
1012                     'cancelled_by_client_uuid',
1013                     'state') {
1014         $Job->{$attr} = $Job2->{$attr};
1015       }
1016       if ($Job->{'state'} ne "Running") {
1017         if ($Job->{'state'} eq "Cancelled") {
1018           Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1019         } else {
1020           Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1021         }
1022         $main::success = 0;
1023         $main::please_freeze = 1;
1024       }
1025     }
1026   }
1027 }
1028
1029 sub check_squeue
1030 {
1031   # return if the kill list was checked <4 seconds ago
1032   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1033   {
1034     return;
1035   }
1036   $squeue_kill_checked = time;
1037
1038   # use killem() on procs whose killtime is reached
1039   for (keys %proc)
1040   {
1041     if (exists $proc{$_}->{killtime}
1042         && $proc{$_}->{killtime} <= time)
1043     {
1044       killem ($_);
1045     }
1046   }
1047
1048   # return if the squeue was checked <60 seconds ago
1049   if (defined $squeue_checked && $squeue_checked > time - 60)
1050   {
1051     return;
1052   }
1053   $squeue_checked = time;
1054
1055   if (!$have_slurm)
1056   {
1057     # here is an opportunity to check for mysterious problems with local procs
1058     return;
1059   }
1060
1061   # get a list of steps still running
1062   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1063   chop @squeue;
1064   if ($squeue[-1] ne "ok")
1065   {
1066     return;
1067   }
1068   pop @squeue;
1069
1070   # which of my jobsteps are running, according to squeue?
1071   my %ok;
1072   foreach (@squeue)
1073   {
1074     if (/^(\d+)\.(\d+) (\S+)/)
1075     {
1076       if ($1 eq $ENV{SLURM_JOBID})
1077       {
1078         $ok{$3} = 1;
1079       }
1080     }
1081   }
1082
1083   # which of my active child procs (>60s old) were not mentioned by squeue?
1084   foreach (keys %proc)
1085   {
1086     if ($proc{$_}->{time} < time - 60
1087         && !exists $ok{$proc{$_}->{jobstepname}}
1088         && !exists $proc{$_}->{killtime})
1089     {
1090       # kill this proc if it hasn't exited in 30 seconds
1091       $proc{$_}->{killtime} = time + 30;
1092     }
1093   }
1094 }
1095
1096
1097 sub release_allocation
1098 {
1099   if ($have_slurm)
1100   {
1101     Log (undef, "release job allocation");
1102     system "scancel $ENV{SLURM_JOBID}";
1103   }
1104 }
1105
1106
1107 sub readfrompipes
1108 {
1109   my $gotsome = 0;
1110   foreach my $job (keys %reader)
1111   {
1112     my $buf;
1113     while (0 < sysread ($reader{$job}, $buf, 8192))
1114     {
1115       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1116       $jobstep[$job]->{stderr} .= $buf;
1117       preprocess_stderr ($job);
1118       if (length ($jobstep[$job]->{stderr}) > 16384)
1119       {
1120         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1121       }
1122       $gotsome = 1;
1123     }
1124   }
1125   return $gotsome;
1126 }
1127
1128
1129 sub preprocess_stderr
1130 {
1131   my $job = shift;
1132
1133   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1134     my $line = $1;
1135     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1136     Log ($job, "stderr $line");
1137     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1138       # whoa.
1139       $main::please_freeze = 1;
1140     }
1141     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1142       $jobstep[$job]->{node_fail} = 1;
1143       ban_node_by_slot($jobstep[$job]->{slotindex});
1144     }
1145   }
1146 }
1147
1148
1149 sub process_stderr
1150 {
1151   my $job = shift;
1152   my $task_success = shift;
1153   preprocess_stderr ($job);
1154
1155   map {
1156     Log ($job, "stderr $_");
1157   } split ("\n", $jobstep[$job]->{stderr});
1158 }
1159
1160 sub fetch_block
1161 {
1162   my $hash = shift;
1163   my ($keep, $child_out, $output_block);
1164
1165   my $cmd = "arv-get \Q$hash\E";
1166   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1167   $output_block = '';
1168   while (1) {
1169     my $buf;
1170     my $bytes = sysread($keep, $buf, 1024 * 1024);
1171     if (!defined $bytes) {
1172       die "reading from arv-get: $!";
1173     } elsif ($bytes == 0) {
1174       # sysread returns 0 at the end of the pipe.
1175       last;
1176     } else {
1177       # some bytes were read into buf.
1178       $output_block .= $buf;
1179     }
1180   }
1181   close $keep;
1182   return $output_block;
1183 }
1184
1185 sub collate_output
1186 {
1187   Log (undef, "collate");
1188
1189   my ($child_out, $child_in);
1190   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1191                   '--retries', put_retry_count());
1192   my $joboutput;
1193   for (@jobstep)
1194   {
1195     next if (!exists $_->{'arvados_task'}->{'output'} ||
1196              !$_->{'arvados_task'}->{'success'});
1197     my $output = $_->{'arvados_task'}->{output};
1198     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1199     {
1200       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1201       print $child_in $output;
1202     }
1203     elsif (@jobstep == 1)
1204     {
1205       $joboutput = $output;
1206       last;
1207     }
1208     elsif (defined (my $outblock = fetch_block ($output)))
1209     {
1210       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1211       print $child_in $outblock;
1212     }
1213     else
1214     {
1215       Log (undef, "XXX fetch_block($output) failed XXX");
1216       $main::success = 0;
1217     }
1218   }
1219   $child_in->close;
1220
1221   if (!defined $joboutput) {
1222     my $s = IO::Select->new($child_out);
1223     if ($s->can_read(120)) {
1224       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1225       chomp($joboutput);
1226     } else {
1227       Log (undef, "timed out reading from 'arv-put'");
1228     }
1229   }
1230   waitpid($pid, 0);
1231
1232   return $joboutput;
1233 }
1234
1235
1236 sub killem
1237 {
1238   foreach (@_)
1239   {
1240     my $sig = 2;                # SIGINT first
1241     if (exists $proc{$_}->{"sent_$sig"} &&
1242         time - $proc{$_}->{"sent_$sig"} > 4)
1243     {
1244       $sig = 15;                # SIGTERM if SIGINT doesn't work
1245     }
1246     if (exists $proc{$_}->{"sent_$sig"} &&
1247         time - $proc{$_}->{"sent_$sig"} > 4)
1248     {
1249       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1250     }
1251     if (!exists $proc{$_}->{"sent_$sig"})
1252     {
1253       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1254       kill $sig, $_;
1255       select (undef, undef, undef, 0.1);
1256       if ($sig == 2)
1257       {
1258         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1259       }
1260       $proc{$_}->{"sent_$sig"} = time;
1261       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1262     }
1263   }
1264 }
1265
1266
1267 sub fhbits
1268 {
1269   my($bits);
1270   for (@_) {
1271     vec($bits,fileno($_),1) = 1;
1272   }
1273   $bits;
1274 }
1275
1276
1277 # Send log output to Keep via arv-put.
1278 #
1279 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1280 # $log_pipe_pid is the pid of the arv-put subprocess.
1281 #
1282 # The only functions that should access these variables directly are:
1283 #
1284 # log_writer_start($logfilename)
1285 #     Starts an arv-put pipe, reading data on stdin and writing it to
1286 #     a $logfilename file in an output collection.
1287 #
1288 # log_writer_send($txt)
1289 #     Writes $txt to the output log collection.
1290 #
1291 # log_writer_finish()
1292 #     Closes the arv-put pipe and returns the output that it produces.
1293 #
1294 # log_writer_is_active()
1295 #     Returns a true value if there is currently a live arv-put
1296 #     process, false otherwise.
1297 #
1298 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1299
1300 sub log_writer_start($)
1301 {
1302   my $logfilename = shift;
1303   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1304                         'arv-put', '--portable-data-hash',
1305                         '--retries', '3',
1306                         '--filename', $logfilename,
1307                         '-');
1308 }
1309
1310 sub log_writer_send($)
1311 {
1312   my $txt = shift;
1313   print $log_pipe_in $txt;
1314 }
1315
1316 sub log_writer_finish()
1317 {
1318   return unless $log_pipe_pid;
1319
1320   close($log_pipe_in);
1321   my $arv_put_output;
1322
1323   my $s = IO::Select->new($log_pipe_out);
1324   if ($s->can_read(120)) {
1325     sysread($log_pipe_out, $arv_put_output, 1024);
1326     chomp($arv_put_output);
1327   } else {
1328     Log (undef, "timed out reading from 'arv-put'");
1329   }
1330
1331   waitpid($log_pipe_pid, 0);
1332   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1333   if ($?) {
1334     Log("log_writer_finish: arv-put returned error $?")
1335   }
1336
1337   return $arv_put_output;
1338 }
1339
1340 sub log_writer_is_active() {
1341   return $log_pipe_pid;
1342 }
1343
1344 sub Log                         # ($jobstep_id, $logmessage)
1345 {
1346   if ($_[1] =~ /\n/) {
1347     for my $line (split (/\n/, $_[1])) {
1348       Log ($_[0], $line);
1349     }
1350     return;
1351   }
1352   my $fh = select STDERR; $|=1; select $fh;
1353   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1354   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1355   $message .= "\n";
1356   my $datetime;
1357   if (log_writer_is_active() || -t STDERR) {
1358     my @gmtime = gmtime;
1359     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1360                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1361   }
1362   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1363
1364   if (log_writer_is_active()) {
1365     log_writer_send($datetime . " " . $message);
1366   }
1367 }
1368
1369
1370 sub croak
1371 {
1372   my ($package, $file, $line) = caller;
1373   my $message = "@_ at $file line $line\n";
1374   Log (undef, $message);
1375   freeze() if @jobstep_todo;
1376   collate_output() if @jobstep_todo;
1377   cleanup();
1378   save_meta() if log_writer_is_active();
1379   die;
1380 }
1381
1382
1383 sub cleanup
1384 {
1385   return if !$job_has_uuid;
1386   if ($Job->{'state'} eq 'Cancelled') {
1387     $Job->update_attributes('finished_at' => scalar gmtime);
1388   } else {
1389     $Job->update_attributes('state' => 'Failed');
1390   }
1391 }
1392
1393
1394 sub save_meta
1395 {
1396   my $justcheckpoint = shift; # false if this will be the last meta saved
1397   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1398
1399   my $loglocator = log_writer_finish();
1400   Log (undef, "log manifest is $loglocator");
1401   $Job->{'log'} = $loglocator;
1402   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1403 }
1404
1405
1406 sub freeze_if_want_freeze
1407 {
1408   if ($main::please_freeze)
1409   {
1410     release_allocation();
1411     if (@_)
1412     {
1413       # kill some srun procs before freeze+stop
1414       map { $proc{$_} = {} } @_;
1415       while (%proc)
1416       {
1417         killem (keys %proc);
1418         select (undef, undef, undef, 0.1);
1419         my $died;
1420         while (($died = waitpid (-1, WNOHANG)) > 0)
1421         {
1422           delete $proc{$died};
1423         }
1424       }
1425     }
1426     freeze();
1427     collate_output();
1428     cleanup();
1429     save_meta();
1430     exit 0;
1431   }
1432 }
1433
1434
1435 sub freeze
1436 {
1437   Log (undef, "Freeze not implemented");
1438   return;
1439 }
1440
1441
1442 sub thaw
1443 {
1444   croak ("Thaw not implemented");
1445 }
1446
1447
1448 sub freezequote
1449 {
1450   my $s = shift;
1451   $s =~ s/\\/\\\\/g;
1452   $s =~ s/\n/\\n/g;
1453   return $s;
1454 }
1455
1456
1457 sub freezeunquote
1458 {
1459   my $s = shift;
1460   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1461   return $s;
1462 }
1463
1464
1465 sub srun
1466 {
1467   my $srunargs = shift;
1468   my $execargs = shift;
1469   my $opts = shift || {};
1470   my $stdin = shift;
1471   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1472   print STDERR (join (" ",
1473                       map { / / ? "'$_'" : $_ }
1474                       (@$args)),
1475                 "\n")
1476       if $ENV{CRUNCH_DEBUG};
1477
1478   if (defined $stdin) {
1479     my $child = open STDIN, "-|";
1480     defined $child or die "no fork: $!";
1481     if ($child == 0) {
1482       print $stdin or die $!;
1483       close STDOUT or die $!;
1484       exit 0;
1485     }
1486   }
1487
1488   return system (@$args) if $opts->{fork};
1489
1490   exec @$args;
1491   warn "ENV size is ".length(join(" ",%ENV));
1492   die "exec failed: $!: @$args";
1493 }
1494
1495
1496 sub ban_node_by_slot {
1497   # Don't start any new jobsteps on this node for 60 seconds
1498   my $slotid = shift;
1499   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1500   $slot[$slotid]->{node}->{hold_count}++;
1501   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1502 }
1503
1504 sub must_lock_now
1505 {
1506   my ($lockfile, $error_message) = @_;
1507   open L, ">", $lockfile or croak("$lockfile: $!");
1508   if (!flock L, LOCK_EX|LOCK_NB) {
1509     croak("Can't lock $lockfile: $error_message\n");
1510   }
1511 }
1512
1513 sub find_docker_image {
1514   # Given a Keep locator, check to see if it contains a Docker image.
1515   # If so, return its stream name and Docker hash.
1516   # If not, return undef for both values.
1517   my $locator = shift;
1518   my ($streamname, $filename);
1519   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1520     foreach my $line (split(/\n/, $image->{manifest_text})) {
1521       my @tokens = split(/\s+/, $line);
1522       next if (!@tokens);
1523       $streamname = shift(@tokens);
1524       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1525         if (defined($filename)) {
1526           return (undef, undef);  # More than one file in the Collection.
1527         } else {
1528           $filename = (split(/:/, $filedata, 3))[2];
1529         }
1530       }
1531     }
1532   }
1533   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1534     return ($streamname, $1);
1535   } else {
1536     return (undef, undef);
1537   }
1538 }
1539
1540 sub put_retry_count {
1541   # Calculate a --retries argument for arv-put that will have it try
1542   # approximately as long as this Job has been running.
1543   my $stoptime = shift || time;
1544   my $starttime = $jobstep[0]->{starttime};
1545   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1546   my $retries = 0;
1547   while ($timediff >= 2) {
1548     $retries++;
1549     $timediff /= 2;
1550   }
1551   return ($retries > 3) ? $retries : 3;
1552 }
1553
1554 __DATA__
1555 #!/usr/bin/perl
1556
1557 # checkout-and-build
1558
1559 use Fcntl ':flock';
1560 use File::Path qw( make_path );
1561
1562 my $destdir = $ENV{"CRUNCH_SRC"};
1563 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1564 my $repo = $ENV{"CRUNCH_SRC_URL"};
1565 my $task_work = $ENV{"TASK_WORK"};
1566
1567 for my $dir ($destdir, $task_work) {
1568     if ($dir) {
1569         make_path $dir;
1570         -e $dir or die "Failed to create temporary directory ($dir): $!";
1571     }
1572 }
1573
1574 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1575 flock L, LOCK_EX;
1576 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1577     if (@ARGV) {
1578         exec(@ARGV);
1579         die "Cannot exec `@ARGV`: $!";
1580     } else {
1581         exit 0;
1582     }
1583 }
1584
1585 unlink "$destdir.commit";
1586 open STDOUT, ">", "$destdir.log";
1587 open STDERR, ">&STDOUT";
1588
1589 mkdir $destdir;
1590 my @git_archive_data = <DATA>;
1591 if (@git_archive_data) {
1592   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1593   print TARX @git_archive_data;
1594   if(!close(TARX)) {
1595     die "'tar -C $destdir -xf -' exited $?: $!";
1596   }
1597 }
1598
1599 my $pwd;
1600 chomp ($pwd = `pwd`);
1601 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1602 mkdir $install_dir;
1603
1604 for my $src_path ("$destdir/arvados/sdk/python") {
1605   if (-d $src_path) {
1606     shell_or_die ("virtualenv", $install_dir);
1607     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1608   }
1609 }
1610
1611 if (-e "$destdir/crunch_scripts/install") {
1612     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1613 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1614     # Old version
1615     shell_or_die ("./tests/autotests.sh", $install_dir);
1616 } elsif (-e "./install.sh") {
1617     shell_or_die ("./install.sh", $install_dir);
1618 }
1619
1620 if ($commit) {
1621     unlink "$destdir.commit.new";
1622     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1623     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1624 }
1625
1626 close L;
1627
1628 if (@ARGV) {
1629     exec(@ARGV);
1630     die "Cannot exec `@ARGV`: $!";
1631 } else {
1632     exit 0;
1633 }
1634
1635 sub shell_or_die
1636 {
1637   if ($ENV{"DEBUG"}) {
1638     print STDERR "@_\n";
1639   }
1640   system (@_) == 0
1641       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1642 }
1643
1644 __DATA__