3187: Record started_at and finished_at for tasks and pipelines.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
130
131
132 $SIG{'USR1'} = sub
133 {
134   $main::ENV{CRUNCH_DEBUG} = 1;
135 };
136 $SIG{'USR2'} = sub
137 {
138   $main::ENV{CRUNCH_DEBUG} = 0;
139 };
140
141
142
143 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $local_logfile;
145
146 my $User = $arv->{'users'}->{'current'}->execute;
147
148 my $Job = {};
149 my $job_id;
150 my $dbh;
151 my $sth;
152 if ($job_has_uuid)
153 {
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # If some other crunch-job process has grabbed this job (or we see
157     # other evidence that the job is already underway) we exit
158     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159     # mark the job as failed.
160     if ($Job->{'is_locked_by_uuid'}) {
161       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
162       exit EX_TEMPFAIL;
163     }
164     if ($Job->{'success'} ne undef) {
165       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
166       exit EX_TEMPFAIL;
167     }
168     if ($Job->{'running'}) {
169       Log(undef, "Job 'running' flag is already set");
170       exit EX_TEMPFAIL;
171     }
172     if ($Job->{'started_at'}) {
173       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
174       exit EX_TEMPFAIL;
175     }
176   }
177 }
178 else
179 {
180   $Job = JSON::decode_json($jobspec);
181
182   if (!$resume_stash)
183   {
184     map { croak ("No $_ specified") unless $Job->{$_} }
185     qw(script script_version script_parameters);
186   }
187
188   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
189   $Job->{'started_at'} = gmtime;
190
191   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
192
193   $job_has_uuid = 1;
194 }
195 $job_id = $Job->{'uuid'};
196
197 my $keep_logfile = $job_id . '.log.txt';
198 $local_logfile = File::Temp->new();
199
200 $Job->{'runtime_constraints'} ||= {};
201 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
202 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
203
204
205 Log (undef, "check slurm allocation");
206 my @slot;
207 my @node;
208 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
209 my @sinfo;
210 if (!$have_slurm)
211 {
212   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
213   push @sinfo, "$localcpus localhost";
214 }
215 if (exists $ENV{SLURM_NODELIST})
216 {
217   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
218 }
219 foreach (@sinfo)
220 {
221   my ($ncpus, $slurm_nodelist) = split;
222   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
223
224   my @nodelist;
225   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
226   {
227     my $nodelist = $1;
228     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
229     {
230       my $ranges = $1;
231       foreach (split (",", $ranges))
232       {
233         my ($a, $b);
234         if (/(\d+)-(\d+)/)
235         {
236           $a = $1;
237           $b = $2;
238         }
239         else
240         {
241           $a = $_;
242           $b = $_;
243         }
244         push @nodelist, map {
245           my $n = $nodelist;
246           $n =~ s/\[[-,\d]+\]/$_/;
247           $n;
248         } ($a..$b);
249       }
250     }
251     else
252     {
253       push @nodelist, $nodelist;
254     }
255   }
256   foreach my $nodename (@nodelist)
257   {
258     Log (undef, "node $nodename - $ncpus slots");
259     my $node = { name => $nodename,
260                  ncpus => $ncpus,
261                  losing_streak => 0,
262                  hold_until => 0 };
263     foreach my $cpu (1..$ncpus)
264     {
265       push @slot, { node => $node,
266                     cpu => $cpu };
267     }
268   }
269   push @node, @nodelist;
270 }
271
272
273
274 # Ensure that we get one jobstep running on each allocated node before
275 # we start overloading nodes with concurrent steps
276
277 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
278
279
280
281 my $jobmanager_id;
282 if ($job_has_uuid)
283 {
284   # Claim this job, and make sure nobody else does
285   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
288     exit EX_TEMPFAIL;
289   }
290   $Job->update_attributes('started_at' => scalar gmtime,
291                           'running' => 1,
292                           'success' => undef,
293                           'tasks_summary' => { 'failed' => 0,
294                                                'todo' => 1,
295                                                'running' => 0,
296                                                'done' => 0 });
297 }
298
299
300 Log (undef, "start");
301 $SIG{'INT'} = sub { $main::please_freeze = 1; };
302 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
303 $SIG{'TERM'} = \&croak;
304 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
305 $SIG{'ALRM'} = sub { $main::please_info = 1; };
306 $SIG{'CONT'} = sub { $main::please_continue = 1; };
307 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
308
309 $main::please_freeze = 0;
310 $main::please_info = 0;
311 $main::please_continue = 0;
312 $main::please_refresh = 0;
313 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
314
315 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
316 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
317 $ENV{"JOB_UUID"} = $job_id;
318
319
320 my @jobstep;
321 my @jobstep_todo = ();
322 my @jobstep_done = ();
323 my @jobstep_tomerge = ();
324 my $jobstep_tomerge_level = 0;
325 my $squeue_checked;
326 my $squeue_kill_checked;
327 my $output_in_keep = 0;
328 my $latest_refresh = scalar time;
329
330
331
332 if (defined $Job->{thawedfromkey})
333 {
334   thaw ($Job->{thawedfromkey});
335 }
336 else
337 {
338   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
339     'job_uuid' => $Job->{'uuid'},
340     'sequence' => 0,
341     'qsequence' => 0,
342     'parameters' => {},
343                                                           });
344   push @jobstep, { 'level' => 0,
345                    'failures' => 0,
346                    'arvados_task' => $first_task,
347                  };
348   push @jobstep_todo, 0;
349 }
350
351
352 if (!$have_slurm)
353 {
354   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
355 }
356
357
358 my $build_script;
359
360
361 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362
363 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
364 if ($skip_install)
365 {
366   if (!defined $no_clear_tmp) {
367     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
368     system($clear_tmp_cmd) == 0
369         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
370   }
371   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
372   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
373     if (-d $src_path) {
374       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
375           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
376       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
377           == 0
378           or croak ("setup.py in $src_path failed: exit ".($?>>8));
379     }
380   }
381 }
382 else
383 {
384   do {
385     local $/ = undef;
386     $build_script = <DATA>;
387   };
388   Log (undef, "Install revision ".$Job->{script_version});
389   my $nodelist = join(",", @node);
390
391   if (!defined $no_clear_tmp) {
392     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
393
394     my $cleanpid = fork();
395     if ($cleanpid == 0)
396     {
397       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
399       exit (1);
400     }
401     while (1)
402     {
403       last if $cleanpid == waitpid (-1, WNOHANG);
404       freeze_if_want_freeze ($cleanpid);
405       select (undef, undef, undef, 0.1);
406     }
407     Log (undef, "Clean-work-dir exited $?");
408   }
409
410   # Install requested code version
411
412   my @execargs;
413   my @srunargs = ("srun",
414                   "--nodelist=$nodelist",
415                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
416
417   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
418   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
419
420   my $commit;
421   my $git_archive;
422   my $treeish = $Job->{'script_version'};
423
424   # If we're running under crunch-dispatch, it will have pulled the
425   # appropriate source tree into its own repository, and given us that
426   # repo's path as $git_dir. If we're running a "local" job, and a
427   # script_version was specified, it's up to the user to provide the
428   # full path to a local repository in Job->{repository}.
429   #
430   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
431   # git-archive --remote where appropriate.
432   #
433   # TODO: Accept a locally-hosted Arvados repository by name or
434   # UUID. Use arvados.v1.repositories.list or .get to figure out the
435   # appropriate fetch-url.
436   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
437
438   $ENV{"CRUNCH_SRC_URL"} = $repo;
439
440   if (-d "$repo/.git") {
441     # We were given a working directory, but we are only interested in
442     # the index.
443     $repo = "$repo/.git";
444   }
445
446   # If this looks like a subversion r#, look for it in git-svn commit messages
447
448   if ($treeish =~ m{^\d{1,4}$}) {
449     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
450     chomp $gitlog;
451     if ($gitlog =~ /^[a-f0-9]{40}$/) {
452       $commit = $gitlog;
453       Log (undef, "Using commit $commit for script_version $treeish");
454     }
455   }
456
457   # If that didn't work, try asking git to look it up as a tree-ish.
458
459   if (!defined $commit) {
460     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
461     chomp $found;
462     if ($found =~ /^[0-9a-f]{40}$/s) {
463       $commit = $found;
464       if ($commit ne $treeish) {
465         # Make sure we record the real commit id in the database,
466         # frozentokey, logs, etc. -- instead of an abbreviation or a
467         # branch name which can become ambiguous or point to a
468         # different commit in the future.
469         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
470         Log (undef, "Using commit $commit for tree-ish $treeish");
471         if ($commit ne $treeish) {
472           $Job->{'script_version'} = $commit;
473           !$job_has_uuid or
474               $Job->update_attributes('script_version' => $commit) or
475               croak("Error while updating job");
476         }
477       }
478     }
479   }
480
481   if (defined $commit) {
482     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
483     @execargs = ("sh", "-c",
484                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
485     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
486   }
487   else {
488     croak ("could not figure out commit id for $treeish");
489   }
490
491   # Note: this section is almost certainly unnecessary if we're
492   # running tasks in docker containers.
493   my $installpid = fork();
494   if ($installpid == 0)
495   {
496     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
497     exit (1);
498   }
499   while (1)
500   {
501     last if $installpid == waitpid (-1, WNOHANG);
502     freeze_if_want_freeze ($installpid);
503     select (undef, undef, undef, 0.1);
504   }
505   Log (undef, "Install exited $?");
506 }
507
508 if (!$have_slurm)
509 {
510   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
511   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
512 }
513
514 # If this job requires a Docker image, install that.
515 my $docker_bin = "/usr/bin/docker.io";
516 my ($docker_locator, $docker_stream, $docker_hash);
517 if ($docker_locator = $Job->{docker_image_locator}) {
518   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
519   if (!$docker_hash)
520   {
521     croak("No Docker image hash found from locator $docker_locator");
522   }
523   $docker_stream =~ s/^\.//;
524   my $docker_install_script = qq{
525 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
526     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
527 fi
528 };
529   my $docker_pid = fork();
530   if ($docker_pid == 0)
531   {
532     srun (["srun", "--nodelist=" . join(',', @node)],
533           ["/bin/sh", "-ec", $docker_install_script]);
534     exit ($?);
535   }
536   while (1)
537   {
538     last if $docker_pid == waitpid (-1, WNOHANG);
539     freeze_if_want_freeze ($docker_pid);
540     select (undef, undef, undef, 0.1);
541   }
542   if ($? != 0)
543   {
544     croak("Installing Docker image from $docker_locator returned exit code $?");
545   }
546 }
547
548 foreach (qw (script script_version script_parameters runtime_constraints))
549 {
550   Log (undef,
551        "$_ " .
552        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
553 }
554 foreach (split (/\n/, $Job->{knobs}))
555 {
556   Log (undef, "knob " . $_);
557 }
558
559
560
561 $main::success = undef;
562
563
564
565 ONELEVEL:
566
567 my $thisround_succeeded = 0;
568 my $thisround_failed = 0;
569 my $thisround_failed_multiple = 0;
570
571 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
572                        or $a <=> $b } @jobstep_todo;
573 my $level = $jobstep[$jobstep_todo[0]]->{level};
574 Log (undef, "start level $level");
575
576
577
578 my %proc;
579 my @freeslot = (0..$#slot);
580 my @holdslot;
581 my %reader;
582 my $progress_is_dirty = 1;
583 my $progress_stats_updated = 0;
584
585 update_progress_stats();
586
587
588
589 THISROUND:
590 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
591 {
592   my $id = $jobstep_todo[$todo_ptr];
593   my $Jobstep = $jobstep[$id];
594   if ($Jobstep->{level} != $level)
595   {
596     next;
597   }
598
599   pipe $reader{$id}, "writer" or croak ($!);
600   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
601   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
602
603   my $childslot = $freeslot[0];
604   my $childnode = $slot[$childslot]->{node};
605   my $childslotname = join (".",
606                             $slot[$childslot]->{node}->{name},
607                             $slot[$childslot]->{cpu});
608   my $childpid = fork();
609   if ($childpid == 0)
610   {
611     $SIG{'INT'} = 'DEFAULT';
612     $SIG{'QUIT'} = 'DEFAULT';
613     $SIG{'TERM'} = 'DEFAULT';
614
615     foreach (values (%reader))
616     {
617       close($_);
618     }
619     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
620     open(STDOUT,">&writer");
621     open(STDERR,">&writer");
622
623     undef $dbh;
624     undef $sth;
625
626     delete $ENV{"GNUPGHOME"};
627     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
628     $ENV{"TASK_QSEQUENCE"} = $id;
629     $ENV{"TASK_SEQUENCE"} = $level;
630     $ENV{"JOB_SCRIPT"} = $Job->{script};
631     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
632       $param =~ tr/a-z/A-Z/;
633       $ENV{"JOB_PARAMETER_$param"} = $value;
634     }
635     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
636     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
637     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
638     $ENV{"HOME"} = $ENV{"TASK_WORK"};
639     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
640     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
641     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
642     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
643
644     $ENV{"GZIP"} = "-n";
645
646     my @srunargs = (
647       "srun",
648       "--nodelist=".$childnode->{name},
649       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
650       "--job-name=$job_id.$id.$$",
651         );
652     my $build_script_to_send = "";
653     my $command =
654         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
655         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
656         ."&& cd $ENV{CRUNCH_TMP} ";
657     if ($build_script)
658     {
659       $build_script_to_send = $build_script;
660       $command .=
661           "&& perl -";
662     }
663     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
664     if ($docker_hash)
665     {
666       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
667       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
668       # Dynamically configure the container to use the host system as its
669       # DNS server.  Get the host's global addresses from the ip command,
670       # and turn them into docker --dns options using gawk.
671       $command .=
672           q{$(ip -o address show scope global |
673               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
674       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
675       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
676       $command .= "--env=\QHOME=/home/crunch\E ";
677       while (my ($env_key, $env_val) = each %ENV)
678       {
679         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
680           if ($env_key eq "TASK_WORK") {
681             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
682           }
683           elsif ($env_key eq "TASK_KEEPMOUNT") {
684             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
685           }
686           else {
687             $command .= "--env=\Q$env_key=$env_val\E ";
688           }
689         }
690       }
691       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
692       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
693       $command .= "\Q$docker_hash\E ";
694       $command .= "stdbuf --output=0 --error=0 ";
695       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
696     } else {
697       # Non-docker run
698       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
699       $command .= "stdbuf --output=0 --error=0 ";
700       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
701     }
702
703     my @execargs = ('bash', '-c', $command);
704     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
705     # exec() failed, we assume nothing happened.
706     Log(undef, "srun() failed on build script");
707     die;
708   }
709   close("writer");
710   if (!defined $childpid)
711   {
712     close $reader{$id};
713     delete $reader{$id};
714     next;
715   }
716   shift @freeslot;
717   $proc{$childpid} = { jobstep => $id,
718                        time => time,
719                        slot => $childslot,
720                        jobstepname => "$job_id.$id.$childpid",
721                      };
722   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
723   $slot[$childslot]->{pid} = $childpid;
724
725   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
726   Log ($id, "child $childpid started on $childslotname");
727   $Jobstep->{starttime} = time;
728   $Jobstep->{node} = $childnode->{name};
729   $Jobstep->{slotindex} = $childslot;
730   delete $Jobstep->{stderr};
731   delete $Jobstep->{finishtime};
732
733   splice @jobstep_todo, $todo_ptr, 1;
734   --$todo_ptr;
735
736   $progress_is_dirty = 1;
737
738   while (!@freeslot
739          ||
740          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
741   {
742     last THISROUND if $main::please_freeze;
743     if ($main::please_info)
744     {
745       $main::please_info = 0;
746       freeze();
747       collate_output();
748       save_meta(1);
749       update_progress_stats();
750     }
751     my $gotsome
752         = readfrompipes ()
753         + reapchildren ();
754     if (!$gotsome)
755     {
756       check_refresh_wanted();
757       check_squeue();
758       update_progress_stats();
759       select (undef, undef, undef, 0.1);
760     }
761     elsif (time - $progress_stats_updated >= 30)
762     {
763       update_progress_stats();
764     }
765     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
766         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
767     {
768       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
769           .($thisround_failed+$thisround_succeeded)
770           .") -- giving up on this round";
771       Log (undef, $message);
772       last THISROUND;
773     }
774
775     # move slots from freeslot to holdslot (or back to freeslot) if necessary
776     for (my $i=$#freeslot; $i>=0; $i--) {
777       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
778         push @holdslot, (splice @freeslot, $i, 1);
779       }
780     }
781     for (my $i=$#holdslot; $i>=0; $i--) {
782       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
783         push @freeslot, (splice @holdslot, $i, 1);
784       }
785     }
786
787     # give up if no nodes are succeeding
788     if (!grep { $_->{node}->{losing_streak} == 0 &&
789                     $_->{node}->{hold_count} < 4 } @slot) {
790       my $message = "Every node has failed -- giving up on this round";
791       Log (undef, $message);
792       last THISROUND;
793     }
794   }
795 }
796
797
798 push @freeslot, splice @holdslot;
799 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
800
801
802 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
803 while (%proc)
804 {
805   if ($main::please_continue) {
806     $main::please_continue = 0;
807     goto THISROUND;
808   }
809   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
810   readfrompipes ();
811   if (!reapchildren())
812   {
813     check_refresh_wanted();
814     check_squeue();
815     update_progress_stats();
816     select (undef, undef, undef, 0.1);
817     killem (keys %proc) if $main::please_freeze;
818   }
819 }
820
821 update_progress_stats();
822 freeze_if_want_freeze();
823
824
825 if (!defined $main::success)
826 {
827   if (@jobstep_todo &&
828       $thisround_succeeded == 0 &&
829       ($thisround_failed == 0 || $thisround_failed > 4))
830   {
831     my $message = "stop because $thisround_failed tasks failed and none succeeded";
832     Log (undef, $message);
833     $main::success = 0;
834   }
835   if (!@jobstep_todo)
836   {
837     $main::success = 1;
838   }
839 }
840
841 goto ONELEVEL if !defined $main::success;
842
843
844 release_allocation();
845 freeze();
846 my $collated_output = &collate_output();
847
848 if ($job_has_uuid) {
849   $Job->update_attributes('running' => 0,
850                           'success' => $collated_output && $main::success,
851                           'finished_at' => scalar gmtime)
852 }
853
854 if (!$collated_output) {
855   Log(undef, "output undef");
856 }
857 else {
858   eval {
859     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
860         or die "failed to get collated manifest: $!";
861     my $orig_manifest_text = '';
862     while (my $manifest_line = <$orig_manifest>) {
863       $orig_manifest_text .= $manifest_line;
864     }
865     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
866       'manifest_text' => $orig_manifest_text,
867     });
868     Log(undef, "output uuid " . $output->{uuid});
869     Log(undef, "output hash " . $output->{portable_data_hash});
870     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
871   };
872   if ($@) {
873     Log (undef, "Failed to register output manifest: $@");
874   }
875 }
876
877 Log (undef, "finish");
878
879 save_meta();
880 exit ($Job->{'success'} ? 1 : 0);
881
882
883
884 sub update_progress_stats
885 {
886   $progress_stats_updated = time;
887   return if !$progress_is_dirty;
888   my ($todo, $done, $running) = (scalar @jobstep_todo,
889                                  scalar @jobstep_done,
890                                  scalar @slot - scalar @freeslot - scalar @holdslot);
891   $Job->{'tasks_summary'} ||= {};
892   $Job->{'tasks_summary'}->{'todo'} = $todo;
893   $Job->{'tasks_summary'}->{'done'} = $done;
894   $Job->{'tasks_summary'}->{'running'} = $running;
895   if ($job_has_uuid) {
896     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
897   }
898   Log (undef, "status: $done done, $running running, $todo todo");
899   $progress_is_dirty = 0;
900 }
901
902
903
904 sub reapchildren
905 {
906   my $pid = waitpid (-1, WNOHANG);
907   return 0 if $pid <= 0;
908
909   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
910                   . "."
911                   . $slot[$proc{$pid}->{slot}]->{cpu});
912   my $jobstepid = $proc{$pid}->{jobstep};
913   my $elapsed = time - $proc{$pid}->{time};
914   my $Jobstep = $jobstep[$jobstepid];
915
916   my $childstatus = $?;
917   my $exitvalue = $childstatus >> 8;
918   my $exitinfo = sprintf("exit %d signal %d%s",
919                          $exitvalue,
920                          $childstatus & 127,
921                          ($childstatus & 128 ? ' core dump' : ''));
922   $Jobstep->{'arvados_task'}->reload;
923   my $task_success = $Jobstep->{'arvados_task'}->{success};
924
925   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
926
927   if (!defined $task_success) {
928     # task did not indicate one way or the other --> fail
929     $Jobstep->{'arvados_task'}->{success} = 0;
930     $Jobstep->{'arvados_task'}->save;
931     $task_success = 0;
932   }
933
934   if (!$task_success)
935   {
936     my $temporary_fail;
937     $temporary_fail ||= $Jobstep->{node_fail};
938     $temporary_fail ||= ($exitvalue == 111);
939
940     ++$thisround_failed;
941     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
942
943     # Check for signs of a failed or misconfigured node
944     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
945         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
946       # Don't count this against jobstep failure thresholds if this
947       # node is already suspected faulty and srun exited quickly
948       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
949           $elapsed < 5) {
950         Log ($jobstepid, "blaming failure on suspect node " .
951              $slot[$proc{$pid}->{slot}]->{node}->{name});
952         $temporary_fail ||= 1;
953       }
954       ban_node_by_slot($proc{$pid}->{slot});
955     }
956
957     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
958                              ++$Jobstep->{'failures'},
959                              $temporary_fail ? 'temporary ' : 'permanent',
960                              $elapsed));
961
962     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
963       # Give up on this task, and the whole job
964       $main::success = 0;
965       $main::please_freeze = 1;
966     }
967     else {
968       # Put this task back on the todo queue
969       push @jobstep_todo, $jobstepid;
970     }
971     $Job->{'tasks_summary'}->{'failed'}++;
972   }
973   else
974   {
975     ++$thisround_succeeded;
976     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
977     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
978     push @jobstep_done, $jobstepid;
979     Log ($jobstepid, "success in $elapsed seconds");
980   }
981   $Jobstep->{exitcode} = $childstatus;
982   $Jobstep->{finishtime} = time;
983   $Jobstep->{'arvados_task'}->{started_at} = strftime "Y%-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
984   $Jobstep->{'arvados_task'}->{finished_at} = strftime "Y%-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
985   $Jobstep->{'arvados_task'}->save;
986   process_stderr ($jobstepid, $task_success);
987   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
988
989   close $reader{$jobstepid};
990   delete $reader{$jobstepid};
991   delete $slot[$proc{$pid}->{slot}]->{pid};
992   push @freeslot, $proc{$pid}->{slot};
993   delete $proc{$pid};
994
995   if ($task_success) {
996     # Load new tasks
997     my $newtask_list = [];
998     my $newtask_results;
999     do {
1000       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1001         'where' => {
1002           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1003         },
1004         'order' => 'qsequence',
1005         'offset' => scalar(@$newtask_list),
1006       );
1007       push(@$newtask_list, @{$newtask_results->{items}});
1008     } while (@{$newtask_results->{items}});
1009     foreach my $arvados_task (@$newtask_list) {
1010       my $jobstep = {
1011         'level' => $arvados_task->{'sequence'},
1012         'failures' => 0,
1013         'arvados_task' => $arvados_task
1014       };
1015       push @jobstep, $jobstep;
1016       push @jobstep_todo, $#jobstep;
1017     }
1018   }
1019
1020   $progress_is_dirty = 1;
1021   1;
1022 }
1023
1024 sub check_refresh_wanted
1025 {
1026   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1027   if (@stat && $stat[9] > $latest_refresh) {
1028     $latest_refresh = scalar time;
1029     if ($job_has_uuid) {
1030       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1031       for my $attr ('cancelled_at',
1032                     'cancelled_by_user_uuid',
1033                     'cancelled_by_client_uuid') {
1034         $Job->{$attr} = $Job2->{$attr};
1035       }
1036       if ($Job->{'cancelled_at'}) {
1037         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1038              " by user " . $Job->{cancelled_by_user_uuid});
1039         $main::success = 0;
1040         $main::please_freeze = 1;
1041       }
1042     }
1043   }
1044 }
1045
1046 sub check_squeue
1047 {
1048   # return if the kill list was checked <4 seconds ago
1049   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1050   {
1051     return;
1052   }
1053   $squeue_kill_checked = time;
1054
1055   # use killem() on procs whose killtime is reached
1056   for (keys %proc)
1057   {
1058     if (exists $proc{$_}->{killtime}
1059         && $proc{$_}->{killtime} <= time)
1060     {
1061       killem ($_);
1062     }
1063   }
1064
1065   # return if the squeue was checked <60 seconds ago
1066   if (defined $squeue_checked && $squeue_checked > time - 60)
1067   {
1068     return;
1069   }
1070   $squeue_checked = time;
1071
1072   if (!$have_slurm)
1073   {
1074     # here is an opportunity to check for mysterious problems with local procs
1075     return;
1076   }
1077
1078   # get a list of steps still running
1079   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1080   chop @squeue;
1081   if ($squeue[-1] ne "ok")
1082   {
1083     return;
1084   }
1085   pop @squeue;
1086
1087   # which of my jobsteps are running, according to squeue?
1088   my %ok;
1089   foreach (@squeue)
1090   {
1091     if (/^(\d+)\.(\d+) (\S+)/)
1092     {
1093       if ($1 eq $ENV{SLURM_JOBID})
1094       {
1095         $ok{$3} = 1;
1096       }
1097     }
1098   }
1099
1100   # which of my active child procs (>60s old) were not mentioned by squeue?
1101   foreach (keys %proc)
1102   {
1103     if ($proc{$_}->{time} < time - 60
1104         && !exists $ok{$proc{$_}->{jobstepname}}
1105         && !exists $proc{$_}->{killtime})
1106     {
1107       # kill this proc if it hasn't exited in 30 seconds
1108       $proc{$_}->{killtime} = time + 30;
1109     }
1110   }
1111 }
1112
1113
1114 sub release_allocation
1115 {
1116   if ($have_slurm)
1117   {
1118     Log (undef, "release job allocation");
1119     system "scancel $ENV{SLURM_JOBID}";
1120   }
1121 }
1122
1123
1124 sub readfrompipes
1125 {
1126   my $gotsome = 0;
1127   foreach my $job (keys %reader)
1128   {
1129     my $buf;
1130     while (0 < sysread ($reader{$job}, $buf, 8192))
1131     {
1132       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1133       $jobstep[$job]->{stderr} .= $buf;
1134       preprocess_stderr ($job);
1135       if (length ($jobstep[$job]->{stderr}) > 16384)
1136       {
1137         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1138       }
1139       $gotsome = 1;
1140     }
1141   }
1142   return $gotsome;
1143 }
1144
1145
1146 sub preprocess_stderr
1147 {
1148   my $job = shift;
1149
1150   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1151     my $line = $1;
1152     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1153     Log ($job, "stderr $line");
1154     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1155       # whoa.
1156       $main::please_freeze = 1;
1157     }
1158     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1159       $jobstep[$job]->{node_fail} = 1;
1160       ban_node_by_slot($jobstep[$job]->{slotindex});
1161     }
1162   }
1163 }
1164
1165
1166 sub process_stderr
1167 {
1168   my $job = shift;
1169   my $task_success = shift;
1170   preprocess_stderr ($job);
1171
1172   map {
1173     Log ($job, "stderr $_");
1174   } split ("\n", $jobstep[$job]->{stderr});
1175 }
1176
1177 sub fetch_block
1178 {
1179   my $hash = shift;
1180   my ($keep, $child_out, $output_block);
1181
1182   my $cmd = "arv-get \Q$hash\E";
1183   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1184   $output_block = '';
1185   while (1) {
1186     my $buf;
1187     my $bytes = sysread($keep, $buf, 1024 * 1024);
1188     if (!defined $bytes) {
1189       die "reading from arv-get: $!";
1190     } elsif ($bytes == 0) {
1191       # sysread returns 0 at the end of the pipe.
1192       last;
1193     } else {
1194       # some bytes were read into buf.
1195       $output_block .= $buf;
1196     }
1197   }
1198   close $keep;
1199   return $output_block;
1200 }
1201
1202 sub collate_output
1203 {
1204   Log (undef, "collate");
1205
1206   my ($child_out, $child_in);
1207   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1208   my $joboutput;
1209   for (@jobstep)
1210   {
1211     next if (!exists $_->{'arvados_task'}->{'output'} ||
1212              !$_->{'arvados_task'}->{'success'});
1213     my $output = $_->{'arvados_task'}->{output};
1214     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1215     {
1216       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1217       print $child_in $output;
1218     }
1219     elsif (@jobstep == 1)
1220     {
1221       $joboutput = $output;
1222       last;
1223     }
1224     elsif (defined (my $outblock = fetch_block ($output)))
1225     {
1226       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1227       print $child_in $outblock;
1228     }
1229     else
1230     {
1231       Log (undef, "XXX fetch_block($output) failed XXX");
1232       $main::success = 0;
1233     }
1234   }
1235   $child_in->close;
1236
1237   if (!defined $joboutput) {
1238     my $s = IO::Select->new($child_out);
1239     if ($s->can_read(120)) {
1240       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1241       chomp($joboutput);
1242     } else {
1243       Log (undef, "timed out reading from 'arv-put'");
1244     }
1245   }
1246   waitpid($pid, 0);
1247
1248   return $joboutput;
1249 }
1250
1251
1252 sub killem
1253 {
1254   foreach (@_)
1255   {
1256     my $sig = 2;                # SIGINT first
1257     if (exists $proc{$_}->{"sent_$sig"} &&
1258         time - $proc{$_}->{"sent_$sig"} > 4)
1259     {
1260       $sig = 15;                # SIGTERM if SIGINT doesn't work
1261     }
1262     if (exists $proc{$_}->{"sent_$sig"} &&
1263         time - $proc{$_}->{"sent_$sig"} > 4)
1264     {
1265       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1266     }
1267     if (!exists $proc{$_}->{"sent_$sig"})
1268     {
1269       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1270       kill $sig, $_;
1271       select (undef, undef, undef, 0.1);
1272       if ($sig == 2)
1273       {
1274         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1275       }
1276       $proc{$_}->{"sent_$sig"} = time;
1277       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1278     }
1279   }
1280 }
1281
1282
1283 sub fhbits
1284 {
1285   my($bits);
1286   for (@_) {
1287     vec($bits,fileno($_),1) = 1;
1288   }
1289   $bits;
1290 }
1291
1292
1293 sub Log                         # ($jobstep_id, $logmessage)
1294 {
1295   if ($_[1] =~ /\n/) {
1296     for my $line (split (/\n/, $_[1])) {
1297       Log ($_[0], $line);
1298     }
1299     return;
1300   }
1301   my $fh = select STDERR; $|=1; select $fh;
1302   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1303   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1304   $message .= "\n";
1305   my $datetime;
1306   if ($local_logfile || -t STDERR) {
1307     my @gmtime = gmtime;
1308     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1309                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1310   }
1311   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1312
1313   if ($local_logfile) {
1314     print $local_logfile $datetime . " " . $message;
1315   }
1316 }
1317
1318
1319 sub croak
1320 {
1321   my ($package, $file, $line) = caller;
1322   my $message = "@_ at $file line $line\n";
1323   Log (undef, $message);
1324   freeze() if @jobstep_todo;
1325   collate_output() if @jobstep_todo;
1326   cleanup();
1327   save_meta() if $local_logfile;
1328   die;
1329 }
1330
1331
1332 sub cleanup
1333 {
1334   return if !$job_has_uuid;
1335   $Job->update_attributes('running' => 0,
1336                           'success' => 0,
1337                           'finished_at' => scalar gmtime);
1338 }
1339
1340
1341 sub save_meta
1342 {
1343   my $justcheckpoint = shift; # false if this will be the last meta saved
1344   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1345
1346   $local_logfile->flush;
1347   my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
1348       . quotemeta($local_logfile->filename);
1349   my $loglocator = `$cmd`;
1350   die "system $cmd failed: $?" if $?;
1351   chomp($loglocator);
1352
1353   $local_logfile = undef;   # the temp file is automatically deleted
1354   Log (undef, "log manifest is $loglocator");
1355   $Job->{'log'} = $loglocator;
1356   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1357 }
1358
1359
1360 sub freeze_if_want_freeze
1361 {
1362   if ($main::please_freeze)
1363   {
1364     release_allocation();
1365     if (@_)
1366     {
1367       # kill some srun procs before freeze+stop
1368       map { $proc{$_} = {} } @_;
1369       while (%proc)
1370       {
1371         killem (keys %proc);
1372         select (undef, undef, undef, 0.1);
1373         my $died;
1374         while (($died = waitpid (-1, WNOHANG)) > 0)
1375         {
1376           delete $proc{$died};
1377         }
1378       }
1379     }
1380     freeze();
1381     collate_output();
1382     cleanup();
1383     save_meta();
1384     exit 0;
1385   }
1386 }
1387
1388
1389 sub freeze
1390 {
1391   Log (undef, "Freeze not implemented");
1392   return;
1393 }
1394
1395
1396 sub thaw
1397 {
1398   croak ("Thaw not implemented");
1399 }
1400
1401
1402 sub freezequote
1403 {
1404   my $s = shift;
1405   $s =~ s/\\/\\\\/g;
1406   $s =~ s/\n/\\n/g;
1407   return $s;
1408 }
1409
1410
1411 sub freezeunquote
1412 {
1413   my $s = shift;
1414   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1415   return $s;
1416 }
1417
1418
1419 sub srun
1420 {
1421   my $srunargs = shift;
1422   my $execargs = shift;
1423   my $opts = shift || {};
1424   my $stdin = shift;
1425   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1426   print STDERR (join (" ",
1427                       map { / / ? "'$_'" : $_ }
1428                       (@$args)),
1429                 "\n")
1430       if $ENV{CRUNCH_DEBUG};
1431
1432   if (defined $stdin) {
1433     my $child = open STDIN, "-|";
1434     defined $child or die "no fork: $!";
1435     if ($child == 0) {
1436       print $stdin or die $!;
1437       close STDOUT or die $!;
1438       exit 0;
1439     }
1440   }
1441
1442   return system (@$args) if $opts->{fork};
1443
1444   exec @$args;
1445   warn "ENV size is ".length(join(" ",%ENV));
1446   die "exec failed: $!: @$args";
1447 }
1448
1449
1450 sub ban_node_by_slot {
1451   # Don't start any new jobsteps on this node for 60 seconds
1452   my $slotid = shift;
1453   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1454   $slot[$slotid]->{node}->{hold_count}++;
1455   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1456 }
1457
1458 sub must_lock_now
1459 {
1460   my ($lockfile, $error_message) = @_;
1461   open L, ">", $lockfile or croak("$lockfile: $!");
1462   if (!flock L, LOCK_EX|LOCK_NB) {
1463     croak("Can't lock $lockfile: $error_message\n");
1464   }
1465 }
1466
1467 sub find_docker_image {
1468   # Given a Keep locator, check to see if it contains a Docker image.
1469   # If so, return its stream name and Docker hash.
1470   # If not, return undef for both values.
1471   my $locator = shift;
1472   my ($streamname, $filename);
1473   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1474     foreach my $line (split(/\n/, $image->{manifest_text})) {
1475       my @tokens = split(/\s+/, $line);
1476       next if (!@tokens);
1477       $streamname = shift(@tokens);
1478       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1479         if (defined($filename)) {
1480           return (undef, undef);  # More than one file in the Collection.
1481         } else {
1482           $filename = (split(/:/, $filedata, 3))[2];
1483         }
1484       }
1485     }
1486   }
1487   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1488     return ($streamname, $1);
1489   } else {
1490     return (undef, undef);
1491   }
1492 }
1493
1494 __DATA__
1495 #!/usr/bin/perl
1496
1497 # checkout-and-build
1498
1499 use Fcntl ':flock';
1500 use File::Path qw( make_path );
1501
1502 my $destdir = $ENV{"CRUNCH_SRC"};
1503 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1504 my $repo = $ENV{"CRUNCH_SRC_URL"};
1505 my $task_work = $ENV{"TASK_WORK"};
1506
1507 for my $dir ($destdir, $task_work) {
1508     if ($dir) {
1509         make_path $dir;
1510         -e $dir or die "Failed to create temporary directory ($dir): $!";
1511     }
1512 }
1513
1514 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1515 flock L, LOCK_EX;
1516 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1517     if (@ARGV) {
1518         exec(@ARGV);
1519         die "Cannot exec `@ARGV`: $!";
1520     } else {
1521         exit 0;
1522     }
1523 }
1524
1525 unlink "$destdir.commit";
1526 open STDOUT, ">", "$destdir.log";
1527 open STDERR, ">&STDOUT";
1528
1529 mkdir $destdir;
1530 my @git_archive_data = <DATA>;
1531 if (@git_archive_data) {
1532   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1533   print TARX @git_archive_data;
1534   if(!close(TARX)) {
1535     die "'tar -C $destdir -xf -' exited $?: $!";
1536   }
1537 }
1538
1539 my $pwd;
1540 chomp ($pwd = `pwd`);
1541 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1542 mkdir $install_dir;
1543
1544 for my $src_path ("$destdir/arvados/sdk/python") {
1545   if (-d $src_path) {
1546     shell_or_die ("virtualenv", $install_dir);
1547     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1548   }
1549 }
1550
1551 if (-e "$destdir/crunch_scripts/install") {
1552     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1553 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1554     # Old version
1555     shell_or_die ("./tests/autotests.sh", $install_dir);
1556 } elsif (-e "./install.sh") {
1557     shell_or_die ("./install.sh", $install_dir);
1558 }
1559
1560 if ($commit) {
1561     unlink "$destdir.commit.new";
1562     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1563     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1564 }
1565
1566 close L;
1567
1568 if (@ARGV) {
1569     exec(@ARGV);
1570     die "Cannot exec `@ARGV`: $!";
1571 } else {
1572     exit 0;
1573 }
1574
1575 sub shell_or_die
1576 {
1577   if ($ENV{"DEBUG"}) {
1578     print STDERR "@_\n";
1579   }
1580   system (@_) == 0
1581       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1582 }
1583
1584 __DATA__