3570: Do not fail the job when crunch-job loses a locking race during startup.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90   if ($ENV{"USER"} ne "crunch" && $< != 0) {
91     # use a tmp dir unique for my uid
92     $ENV{"CRUNCH_TMP"} .= "-$<";
93   }
94 }
95
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
99 }
100
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
105
106 my $force_unlock;
107 my $git_dir;
108 my $jobspec;
109 my $job_api_token;
110 my $no_clear_tmp;
111 my $resume_stash;
112 GetOptions('force-unlock' => \$force_unlock,
113            'git-dir=s' => \$git_dir,
114            'job=s' => \$jobspec,
115            'job-api-token=s' => \$job_api_token,
116            'no-clear-tmp' => \$no_clear_tmp,
117            'resume-stash=s' => \$resume_stash,
118     );
119
120 if (defined $job_api_token) {
121   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
122 }
123
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
127
128
129 $SIG{'USR1'} = sub
130 {
131   $main::ENV{CRUNCH_DEBUG} = 1;
132 };
133 $SIG{'USR2'} = sub
134 {
135   $main::ENV{CRUNCH_DEBUG} = 0;
136 };
137
138
139
140 my $arv = Arvados->new('apiVersion' => 'v1');
141 my $local_logfile;
142
143 my $User = $arv->{'users'}->{'current'}->execute;
144
145 my $Job = {};
146 my $job_id;
147 my $dbh;
148 my $sth;
149 if ($job_has_uuid)
150 {
151   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152   if (!$force_unlock) {
153     # If some other crunch-job process has grabbed this job (or we see
154     # other evidence that the job is already underway) we exit 111 so
155     # crunch-dispatch (our parent process) doesn't mark the job as
156     # failed.
157     if ($Job->{'is_locked_by_uuid'}) {
158       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'} . ", exiting 111");
159       exit(111);
160     }
161     if ($Job->{'success'} ne undef) {
162       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
163       exit(111);
164     }
165     if ($Job->{'running'}) {
166       Log(undef, "Job 'running' flag is already set");
167       exit(111);
168     }
169     if ($Job->{'started_at'}) {
170       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
171       exit(111);
172     }
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187
188   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
189
190   $job_has_uuid = 1;
191 }
192 $job_id = $Job->{'uuid'};
193
194 my $keep_logfile = $job_id . '.log.txt';
195 $local_logfile = File::Temp->new();
196
197 $Job->{'runtime_constraints'} ||= {};
198 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
199 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
200
201
202 Log (undef, "check slurm allocation");
203 my @slot;
204 my @node;
205 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
206 my @sinfo;
207 if (!$have_slurm)
208 {
209   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
210   push @sinfo, "$localcpus localhost";
211 }
212 if (exists $ENV{SLURM_NODELIST})
213 {
214   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
215 }
216 foreach (@sinfo)
217 {
218   my ($ncpus, $slurm_nodelist) = split;
219   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
220
221   my @nodelist;
222   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
223   {
224     my $nodelist = $1;
225     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
226     {
227       my $ranges = $1;
228       foreach (split (",", $ranges))
229       {
230         my ($a, $b);
231         if (/(\d+)-(\d+)/)
232         {
233           $a = $1;
234           $b = $2;
235         }
236         else
237         {
238           $a = $_;
239           $b = $_;
240         }
241         push @nodelist, map {
242           my $n = $nodelist;
243           $n =~ s/\[[-,\d]+\]/$_/;
244           $n;
245         } ($a..$b);
246       }
247     }
248     else
249     {
250       push @nodelist, $nodelist;
251     }
252   }
253   foreach my $nodename (@nodelist)
254   {
255     Log (undef, "node $nodename - $ncpus slots");
256     my $node = { name => $nodename,
257                  ncpus => $ncpus,
258                  losing_streak => 0,
259                  hold_until => 0 };
260     foreach my $cpu (1..$ncpus)
261     {
262       push @slot, { node => $node,
263                     cpu => $cpu };
264     }
265   }
266   push @node, @nodelist;
267 }
268
269
270
271 # Ensure that we get one jobstep running on each allocated node before
272 # we start overloading nodes with concurrent steps
273
274 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
275
276
277
278 my $jobmanager_id;
279 if ($job_has_uuid)
280 {
281   # Claim this job, and make sure nobody else does
282   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
283           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
284     Log(undef, "Error while updating / locking job, exiting 111");
285     exit(111);
286   }
287   $Job->update_attributes('started_at' => scalar gmtime,
288                           'running' => 1,
289                           'success' => undef,
290                           'tasks_summary' => { 'failed' => 0,
291                                                'todo' => 1,
292                                                'running' => 0,
293                                                'done' => 0 });
294 }
295
296
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
305
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
311
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
315
316
317 my @jobstep;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
322 my $squeue_checked;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
326
327
328
329 if (defined $Job->{thawedfromkey})
330 {
331   thaw ($Job->{thawedfromkey});
332 }
333 else
334 {
335   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336     'job_uuid' => $Job->{'uuid'},
337     'sequence' => 0,
338     'qsequence' => 0,
339     'parameters' => {},
340                                                           });
341   push @jobstep, { 'level' => 0,
342                    'failures' => 0,
343                    'arvados_task' => $first_task,
344                  };
345   push @jobstep_todo, 0;
346 }
347
348
349 if (!$have_slurm)
350 {
351   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
352 }
353
354
355 my $build_script;
356
357
358 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
359
360 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
361 if ($skip_install)
362 {
363   if (!defined $no_clear_tmp) {
364     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
365     system($clear_tmp_cmd) == 0
366         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
367   }
368   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
369   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
370     if (-d $src_path) {
371       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
372           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
373       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
374           == 0
375           or croak ("setup.py in $src_path failed: exit ".($?>>8));
376     }
377   }
378 }
379 else
380 {
381   do {
382     local $/ = undef;
383     $build_script = <DATA>;
384   };
385   Log (undef, "Install revision ".$Job->{script_version});
386   my $nodelist = join(",", @node);
387
388   if (!defined $no_clear_tmp) {
389     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
390
391     my $cleanpid = fork();
392     if ($cleanpid == 0)
393     {
394       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
395             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
396       exit (1);
397     }
398     while (1)
399     {
400       last if $cleanpid == waitpid (-1, WNOHANG);
401       freeze_if_want_freeze ($cleanpid);
402       select (undef, undef, undef, 0.1);
403     }
404     Log (undef, "Clean-work-dir exited $?");
405   }
406
407   # Install requested code version
408
409   my @execargs;
410   my @srunargs = ("srun",
411                   "--nodelist=$nodelist",
412                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
413
414   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
415   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
416
417   my $commit;
418   my $git_archive;
419   my $treeish = $Job->{'script_version'};
420
421   # If we're running under crunch-dispatch, it will have pulled the
422   # appropriate source tree into its own repository, and given us that
423   # repo's path as $git_dir. If we're running a "local" job, and a
424   # script_version was specified, it's up to the user to provide the
425   # full path to a local repository in Job->{repository}.
426   #
427   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
428   # git-archive --remote where appropriate.
429   #
430   # TODO: Accept a locally-hosted Arvados repository by name or
431   # UUID. Use arvados.v1.repositories.list or .get to figure out the
432   # appropriate fetch-url.
433   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
434
435   $ENV{"CRUNCH_SRC_URL"} = $repo;
436
437   if (-d "$repo/.git") {
438     # We were given a working directory, but we are only interested in
439     # the index.
440     $repo = "$repo/.git";
441   }
442
443   # If this looks like a subversion r#, look for it in git-svn commit messages
444
445   if ($treeish =~ m{^\d{1,4}$}) {
446     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
447     chomp $gitlog;
448     if ($gitlog =~ /^[a-f0-9]{40}$/) {
449       $commit = $gitlog;
450       Log (undef, "Using commit $commit for script_version $treeish");
451     }
452   }
453
454   # If that didn't work, try asking git to look it up as a tree-ish.
455
456   if (!defined $commit) {
457     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
458     chomp $found;
459     if ($found =~ /^[0-9a-f]{40}$/s) {
460       $commit = $found;
461       if ($commit ne $treeish) {
462         # Make sure we record the real commit id in the database,
463         # frozentokey, logs, etc. -- instead of an abbreviation or a
464         # branch name which can become ambiguous or point to a
465         # different commit in the future.
466         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
467         Log (undef, "Using commit $commit for tree-ish $treeish");
468         if ($commit ne $treeish) {
469           $Job->{'script_version'} = $commit;
470           !$job_has_uuid or
471               $Job->update_attributes('script_version' => $commit) or
472               croak("Error while updating job");
473         }
474       }
475     }
476   }
477
478   if (defined $commit) {
479     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
480     @execargs = ("sh", "-c",
481                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
482     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
483   }
484   else {
485     croak ("could not figure out commit id for $treeish");
486   }
487
488   my $installpid = fork();
489   if ($installpid == 0)
490   {
491     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
492     exit (1);
493   }
494   while (1)
495   {
496     last if $installpid == waitpid (-1, WNOHANG);
497     freeze_if_want_freeze ($installpid);
498     select (undef, undef, undef, 0.1);
499   }
500   Log (undef, "Install exited $?");
501 }
502
503 if (!$have_slurm)
504 {
505   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
506   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
507 }
508
509 # If this job requires a Docker image, install that.
510 my $docker_bin = "/usr/bin/docker.io";
511 my ($docker_locator, $docker_stream, $docker_hash);
512 if ($docker_locator = $Job->{docker_image_locator}) {
513   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
514   if (!$docker_hash)
515   {
516     croak("No Docker image hash found from locator $docker_locator");
517   }
518   $docker_stream =~ s/^\.//;
519   my $docker_install_script = qq{
520 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
521     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
522 fi
523 };
524   my $docker_pid = fork();
525   if ($docker_pid == 0)
526   {
527     srun (["srun", "--nodelist=" . join(',', @node)],
528           ["/bin/sh", "-ec", $docker_install_script]);
529     exit ($?);
530   }
531   while (1)
532   {
533     last if $docker_pid == waitpid (-1, WNOHANG);
534     freeze_if_want_freeze ($docker_pid);
535     select (undef, undef, undef, 0.1);
536   }
537   if ($? != 0)
538   {
539     croak("Installing Docker image from $docker_locator returned exit code $?");
540   }
541 }
542
543 foreach (qw (script script_version script_parameters runtime_constraints))
544 {
545   Log (undef,
546        "$_ " .
547        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
548 }
549 foreach (split (/\n/, $Job->{knobs}))
550 {
551   Log (undef, "knob " . $_);
552 }
553
554
555
556 $main::success = undef;
557
558
559
560 ONELEVEL:
561
562 my $thisround_succeeded = 0;
563 my $thisround_failed = 0;
564 my $thisround_failed_multiple = 0;
565
566 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
567                        or $a <=> $b } @jobstep_todo;
568 my $level = $jobstep[$jobstep_todo[0]]->{level};
569 Log (undef, "start level $level");
570
571
572
573 my %proc;
574 my @freeslot = (0..$#slot);
575 my @holdslot;
576 my %reader;
577 my $progress_is_dirty = 1;
578 my $progress_stats_updated = 0;
579
580 update_progress_stats();
581
582
583
584 THISROUND:
585 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
586 {
587   my $id = $jobstep_todo[$todo_ptr];
588   my $Jobstep = $jobstep[$id];
589   if ($Jobstep->{level} != $level)
590   {
591     next;
592   }
593
594   pipe $reader{$id}, "writer" or croak ($!);
595   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
596   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
597
598   my $childslot = $freeslot[0];
599   my $childnode = $slot[$childslot]->{node};
600   my $childslotname = join (".",
601                             $slot[$childslot]->{node}->{name},
602                             $slot[$childslot]->{cpu});
603   my $childpid = fork();
604   if ($childpid == 0)
605   {
606     $SIG{'INT'} = 'DEFAULT';
607     $SIG{'QUIT'} = 'DEFAULT';
608     $SIG{'TERM'} = 'DEFAULT';
609
610     foreach (values (%reader))
611     {
612       close($_);
613     }
614     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
615     open(STDOUT,">&writer");
616     open(STDERR,">&writer");
617
618     undef $dbh;
619     undef $sth;
620
621     delete $ENV{"GNUPGHOME"};
622     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
623     $ENV{"TASK_QSEQUENCE"} = $id;
624     $ENV{"TASK_SEQUENCE"} = $level;
625     $ENV{"JOB_SCRIPT"} = $Job->{script};
626     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
627       $param =~ tr/a-z/A-Z/;
628       $ENV{"JOB_PARAMETER_$param"} = $value;
629     }
630     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
631     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
632     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
633     $ENV{"HOME"} = $ENV{"TASK_WORK"};
634     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
635     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
636     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
637     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
638
639     $ENV{"GZIP"} = "-n";
640
641     my @srunargs = (
642       "srun",
643       "--nodelist=".$childnode->{name},
644       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
645       "--job-name=$job_id.$id.$$",
646         );
647     my $build_script_to_send = "";
648     my $command =
649         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
650         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
651         ."&& cd $ENV{CRUNCH_TMP} ";
652     if ($build_script)
653     {
654       $build_script_to_send = $build_script;
655       $command .=
656           "&& perl -";
657     }
658     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
659     if ($docker_hash)
660     {
661       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
662       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
663       # Dynamically configure the container to use the host system as its
664       # DNS server.  Get the host's global addresses from the ip command,
665       # and turn them into docker --dns options using gawk.
666       $command .=
667           q{$(ip -o address show scope global |
668               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
669       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
670       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
671       $command .= "--env=\QHOME=/home/crunch\E ";
672       while (my ($env_key, $env_val) = each %ENV)
673       {
674         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
675           if ($env_key eq "TASK_WORK") {
676             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
677           }
678           elsif ($env_key eq "TASK_KEEPMOUNT") {
679             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
680           }
681           else {
682             $command .= "--env=\Q$env_key=$env_val\E ";
683           }
684         }
685       }
686       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
687       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
688       $command .= "\Q$docker_hash\E ";
689       $command .= "stdbuf --output=0 --error=0 ";
690       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
691     } else {
692       # Non-docker run
693       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
694       $command .= "stdbuf --output=0 --error=0 ";
695       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
696     }
697
698     my @execargs = ('bash', '-c', $command);
699     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
700     # exec() failed, we assume nothing happened.
701     Log(undef, "srun() failed on build script");
702     die;
703   }
704   close("writer");
705   if (!defined $childpid)
706   {
707     close $reader{$id};
708     delete $reader{$id};
709     next;
710   }
711   shift @freeslot;
712   $proc{$childpid} = { jobstep => $id,
713                        time => time,
714                        slot => $childslot,
715                        jobstepname => "$job_id.$id.$childpid",
716                      };
717   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
718   $slot[$childslot]->{pid} = $childpid;
719
720   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
721   Log ($id, "child $childpid started on $childslotname");
722   $Jobstep->{starttime} = time;
723   $Jobstep->{node} = $childnode->{name};
724   $Jobstep->{slotindex} = $childslot;
725   delete $Jobstep->{stderr};
726   delete $Jobstep->{finishtime};
727
728   splice @jobstep_todo, $todo_ptr, 1;
729   --$todo_ptr;
730
731   $progress_is_dirty = 1;
732
733   while (!@freeslot
734          ||
735          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
736   {
737     last THISROUND if $main::please_freeze;
738     if ($main::please_info)
739     {
740       $main::please_info = 0;
741       freeze();
742       collate_output();
743       save_meta(1);
744       update_progress_stats();
745     }
746     my $gotsome
747         = readfrompipes ()
748         + reapchildren ();
749     if (!$gotsome)
750     {
751       check_refresh_wanted();
752       check_squeue();
753       update_progress_stats();
754       select (undef, undef, undef, 0.1);
755     }
756     elsif (time - $progress_stats_updated >= 30)
757     {
758       update_progress_stats();
759     }
760     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
761         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
762     {
763       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
764           .($thisround_failed+$thisround_succeeded)
765           .") -- giving up on this round";
766       Log (undef, $message);
767       last THISROUND;
768     }
769
770     # move slots from freeslot to holdslot (or back to freeslot) if necessary
771     for (my $i=$#freeslot; $i>=0; $i--) {
772       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
773         push @holdslot, (splice @freeslot, $i, 1);
774       }
775     }
776     for (my $i=$#holdslot; $i>=0; $i--) {
777       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
778         push @freeslot, (splice @holdslot, $i, 1);
779       }
780     }
781
782     # give up if no nodes are succeeding
783     if (!grep { $_->{node}->{losing_streak} == 0 &&
784                     $_->{node}->{hold_count} < 4 } @slot) {
785       my $message = "Every node has failed -- giving up on this round";
786       Log (undef, $message);
787       last THISROUND;
788     }
789   }
790 }
791
792
793 push @freeslot, splice @holdslot;
794 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
795
796
797 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
798 while (%proc)
799 {
800   if ($main::please_continue) {
801     $main::please_continue = 0;
802     goto THISROUND;
803   }
804   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
805   readfrompipes ();
806   if (!reapchildren())
807   {
808     check_refresh_wanted();
809     check_squeue();
810     update_progress_stats();
811     select (undef, undef, undef, 0.1);
812     killem (keys %proc) if $main::please_freeze;
813   }
814 }
815
816 update_progress_stats();
817 freeze_if_want_freeze();
818
819
820 if (!defined $main::success)
821 {
822   if (@jobstep_todo &&
823       $thisround_succeeded == 0 &&
824       ($thisround_failed == 0 || $thisround_failed > 4))
825   {
826     my $message = "stop because $thisround_failed tasks failed and none succeeded";
827     Log (undef, $message);
828     $main::success = 0;
829   }
830   if (!@jobstep_todo)
831   {
832     $main::success = 1;
833   }
834 }
835
836 goto ONELEVEL if !defined $main::success;
837
838
839 release_allocation();
840 freeze();
841 my $collated_output = &collate_output();
842
843 if ($job_has_uuid) {
844   $Job->update_attributes('running' => 0,
845                           'success' => $collated_output && $main::success,
846                           'finished_at' => scalar gmtime)
847 }
848
849 if (!$collated_output) {
850   Log(undef, "output undef");
851 }
852 else {
853   eval {
854     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
855         or die "failed to get collated manifest: $!";
856     # Read the original manifest, and strip permission hints from it,
857     # so we can put the result in a Collection.
858     my @stripped_manifest_lines = ();
859     my $orig_manifest_text = '';
860     while (my $manifest_line = <$orig_manifest>) {
861       $orig_manifest_text .= $manifest_line;
862       my @words = split(/ /, $manifest_line, -1);
863       foreach my $ii (0..$#words) {
864         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
865           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
866         }
867       }
868       push(@stripped_manifest_lines, join(" ", @words));
869     }
870     my $stripped_manifest_text = join("", @stripped_manifest_lines);
871     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
872       'uuid' => md5_hex($stripped_manifest_text),
873       'manifest_text' => $orig_manifest_text,
874     });
875     Log(undef, "output " . $output->{uuid});
876     $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
877     if ($Job->{'output_is_persistent'}) {
878       $arv->{'links'}->{'create'}->execute('link' => {
879         'tail_kind' => 'arvados#user',
880         'tail_uuid' => $User->{'uuid'},
881         'head_kind' => 'arvados#collection',
882         'head_uuid' => $Job->{'output'},
883         'link_class' => 'resources',
884         'name' => 'wants',
885       });
886     }
887   };
888   if ($@) {
889     Log (undef, "Failed to register output manifest: $@");
890   }
891 }
892
893 Log (undef, "finish");
894
895 save_meta();
896 exit 0;
897
898
899
900 sub update_progress_stats
901 {
902   $progress_stats_updated = time;
903   return if !$progress_is_dirty;
904   my ($todo, $done, $running) = (scalar @jobstep_todo,
905                                  scalar @jobstep_done,
906                                  scalar @slot - scalar @freeslot - scalar @holdslot);
907   $Job->{'tasks_summary'} ||= {};
908   $Job->{'tasks_summary'}->{'todo'} = $todo;
909   $Job->{'tasks_summary'}->{'done'} = $done;
910   $Job->{'tasks_summary'}->{'running'} = $running;
911   if ($job_has_uuid) {
912     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
913   }
914   Log (undef, "status: $done done, $running running, $todo todo");
915   $progress_is_dirty = 0;
916 }
917
918
919
920 sub reapchildren
921 {
922   my $pid = waitpid (-1, WNOHANG);
923   return 0 if $pid <= 0;
924
925   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
926                   . "."
927                   . $slot[$proc{$pid}->{slot}]->{cpu});
928   my $jobstepid = $proc{$pid}->{jobstep};
929   my $elapsed = time - $proc{$pid}->{time};
930   my $Jobstep = $jobstep[$jobstepid];
931
932   my $childstatus = $?;
933   my $exitvalue = $childstatus >> 8;
934   my $exitinfo = sprintf("exit %d signal %d%s",
935                          $exitvalue,
936                          $childstatus & 127,
937                          ($childstatus & 128 ? ' core dump' : ''));
938   $Jobstep->{'arvados_task'}->reload;
939   my $task_success = $Jobstep->{'arvados_task'}->{success};
940
941   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
942
943   if (!defined $task_success) {
944     # task did not indicate one way or the other --> fail
945     $Jobstep->{'arvados_task'}->{success} = 0;
946     $Jobstep->{'arvados_task'}->save;
947     $task_success = 0;
948   }
949
950   if (!$task_success)
951   {
952     my $temporary_fail;
953     $temporary_fail ||= $Jobstep->{node_fail};
954     $temporary_fail ||= ($exitvalue == 111);
955
956     ++$thisround_failed;
957     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
958
959     # Check for signs of a failed or misconfigured node
960     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
961         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
962       # Don't count this against jobstep failure thresholds if this
963       # node is already suspected faulty and srun exited quickly
964       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
965           $elapsed < 5) {
966         Log ($jobstepid, "blaming failure on suspect node " .
967              $slot[$proc{$pid}->{slot}]->{node}->{name});
968         $temporary_fail ||= 1;
969       }
970       ban_node_by_slot($proc{$pid}->{slot});
971     }
972
973     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
974                              ++$Jobstep->{'failures'},
975                              $temporary_fail ? 'temporary ' : 'permanent',
976                              $elapsed));
977
978     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
979       # Give up on this task, and the whole job
980       $main::success = 0;
981       $main::please_freeze = 1;
982     }
983     else {
984       # Put this task back on the todo queue
985       push @jobstep_todo, $jobstepid;
986     }
987     $Job->{'tasks_summary'}->{'failed'}++;
988   }
989   else
990   {
991     ++$thisround_succeeded;
992     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
993     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
994     push @jobstep_done, $jobstepid;
995     Log ($jobstepid, "success in $elapsed seconds");
996   }
997   $Jobstep->{exitcode} = $childstatus;
998   $Jobstep->{finishtime} = time;
999   process_stderr ($jobstepid, $task_success);
1000   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1001
1002   close $reader{$jobstepid};
1003   delete $reader{$jobstepid};
1004   delete $slot[$proc{$pid}->{slot}]->{pid};
1005   push @freeslot, $proc{$pid}->{slot};
1006   delete $proc{$pid};
1007
1008   if ($task_success) {
1009     # Load new tasks
1010     my $newtask_list = [];
1011     my $newtask_results;
1012     do {
1013       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1014         'where' => {
1015           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1016         },
1017         'order' => 'qsequence',
1018         'offset' => scalar(@$newtask_list),
1019       );
1020       push(@$newtask_list, @{$newtask_results->{items}});
1021     } while (@{$newtask_results->{items}});
1022     foreach my $arvados_task (@$newtask_list) {
1023       my $jobstep = {
1024         'level' => $arvados_task->{'sequence'},
1025         'failures' => 0,
1026         'arvados_task' => $arvados_task
1027       };
1028       push @jobstep, $jobstep;
1029       push @jobstep_todo, $#jobstep;
1030     }
1031   }
1032
1033   $progress_is_dirty = 1;
1034   1;
1035 }
1036
1037 sub check_refresh_wanted
1038 {
1039   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1040   if (@stat && $stat[9] > $latest_refresh) {
1041     $latest_refresh = scalar time;
1042     if ($job_has_uuid) {
1043       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1044       for my $attr ('cancelled_at',
1045                     'cancelled_by_user_uuid',
1046                     'cancelled_by_client_uuid') {
1047         $Job->{$attr} = $Job2->{$attr};
1048       }
1049       if ($Job->{'cancelled_at'}) {
1050         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1051              " by user " . $Job->{cancelled_by_user_uuid});
1052         $main::success = 0;
1053         $main::please_freeze = 1;
1054       }
1055     }
1056   }
1057 }
1058
1059 sub check_squeue
1060 {
1061   # return if the kill list was checked <4 seconds ago
1062   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1063   {
1064     return;
1065   }
1066   $squeue_kill_checked = time;
1067
1068   # use killem() on procs whose killtime is reached
1069   for (keys %proc)
1070   {
1071     if (exists $proc{$_}->{killtime}
1072         && $proc{$_}->{killtime} <= time)
1073     {
1074       killem ($_);
1075     }
1076   }
1077
1078   # return if the squeue was checked <60 seconds ago
1079   if (defined $squeue_checked && $squeue_checked > time - 60)
1080   {
1081     return;
1082   }
1083   $squeue_checked = time;
1084
1085   if (!$have_slurm)
1086   {
1087     # here is an opportunity to check for mysterious problems with local procs
1088     return;
1089   }
1090
1091   # get a list of steps still running
1092   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1093   chop @squeue;
1094   if ($squeue[-1] ne "ok")
1095   {
1096     return;
1097   }
1098   pop @squeue;
1099
1100   # which of my jobsteps are running, according to squeue?
1101   my %ok;
1102   foreach (@squeue)
1103   {
1104     if (/^(\d+)\.(\d+) (\S+)/)
1105     {
1106       if ($1 eq $ENV{SLURM_JOBID})
1107       {
1108         $ok{$3} = 1;
1109       }
1110     }
1111   }
1112
1113   # which of my active child procs (>60s old) were not mentioned by squeue?
1114   foreach (keys %proc)
1115   {
1116     if ($proc{$_}->{time} < time - 60
1117         && !exists $ok{$proc{$_}->{jobstepname}}
1118         && !exists $proc{$_}->{killtime})
1119     {
1120       # kill this proc if it hasn't exited in 30 seconds
1121       $proc{$_}->{killtime} = time + 30;
1122     }
1123   }
1124 }
1125
1126
1127 sub release_allocation
1128 {
1129   if ($have_slurm)
1130   {
1131     Log (undef, "release job allocation");
1132     system "scancel $ENV{SLURM_JOBID}";
1133   }
1134 }
1135
1136
1137 sub readfrompipes
1138 {
1139   my $gotsome = 0;
1140   foreach my $job (keys %reader)
1141   {
1142     my $buf;
1143     while (0 < sysread ($reader{$job}, $buf, 8192))
1144     {
1145       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1146       $jobstep[$job]->{stderr} .= $buf;
1147       preprocess_stderr ($job);
1148       if (length ($jobstep[$job]->{stderr}) > 16384)
1149       {
1150         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1151       }
1152       $gotsome = 1;
1153     }
1154   }
1155   return $gotsome;
1156 }
1157
1158
1159 sub preprocess_stderr
1160 {
1161   my $job = shift;
1162
1163   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1164     my $line = $1;
1165     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1166     Log ($job, "stderr $line");
1167     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1168       # whoa.
1169       $main::please_freeze = 1;
1170     }
1171     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1172       $jobstep[$job]->{node_fail} = 1;
1173       ban_node_by_slot($jobstep[$job]->{slotindex});
1174     }
1175   }
1176 }
1177
1178
1179 sub process_stderr
1180 {
1181   my $job = shift;
1182   my $task_success = shift;
1183   preprocess_stderr ($job);
1184
1185   map {
1186     Log ($job, "stderr $_");
1187   } split ("\n", $jobstep[$job]->{stderr});
1188 }
1189
1190 sub fetch_block
1191 {
1192   my $hash = shift;
1193   my ($keep, $child_out, $output_block);
1194
1195   my $cmd = "arv-get \Q$hash\E";
1196   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1197   $output_block = '';
1198   while (1) {
1199     my $buf;
1200     my $bytes = sysread($keep, $buf, 1024 * 1024);
1201     if (!defined $bytes) {
1202       die "reading from arv-get: $!";
1203     } elsif ($bytes == 0) {
1204       # sysread returns 0 at the end of the pipe.
1205       last;
1206     } else {
1207       # some bytes were read into buf.
1208       $output_block .= $buf;
1209     }
1210   }
1211   close $keep;
1212   return $output_block;
1213 }
1214
1215 sub collate_output
1216 {
1217   Log (undef, "collate");
1218
1219   my ($child_out, $child_in);
1220   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1221   my $joboutput;
1222   for (@jobstep)
1223   {
1224     next if (!exists $_->{'arvados_task'}->{output} ||
1225              !$_->{'arvados_task'}->{'success'} ||
1226              $_->{'exitcode'} != 0);
1227     my $output = $_->{'arvados_task'}->{output};
1228     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1229     {
1230       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1231       print $child_in $output;
1232     }
1233     elsif (@jobstep == 1)
1234     {
1235       $joboutput = $output;
1236       last;
1237     }
1238     elsif (defined (my $outblock = fetch_block ($output)))
1239     {
1240       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1241       print $child_in $outblock;
1242     }
1243     else
1244     {
1245       Log (undef, "XXX fetch_block($output) failed XXX");
1246       $main::success = 0;
1247     }
1248   }
1249   $child_in->close;
1250
1251   if (!defined $joboutput) {
1252     my $s = IO::Select->new($child_out);
1253     if ($s->can_read(120)) {
1254       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1255       chomp($joboutput);
1256     } else {
1257       Log (undef, "timed out reading from 'arv-put'");
1258     }
1259   }
1260   waitpid($pid, 0);
1261
1262   return $joboutput;
1263 }
1264
1265
1266 sub killem
1267 {
1268   foreach (@_)
1269   {
1270     my $sig = 2;                # SIGINT first
1271     if (exists $proc{$_}->{"sent_$sig"} &&
1272         time - $proc{$_}->{"sent_$sig"} > 4)
1273     {
1274       $sig = 15;                # SIGTERM if SIGINT doesn't work
1275     }
1276     if (exists $proc{$_}->{"sent_$sig"} &&
1277         time - $proc{$_}->{"sent_$sig"} > 4)
1278     {
1279       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1280     }
1281     if (!exists $proc{$_}->{"sent_$sig"})
1282     {
1283       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1284       kill $sig, $_;
1285       select (undef, undef, undef, 0.1);
1286       if ($sig == 2)
1287       {
1288         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1289       }
1290       $proc{$_}->{"sent_$sig"} = time;
1291       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1292     }
1293   }
1294 }
1295
1296
1297 sub fhbits
1298 {
1299   my($bits);
1300   for (@_) {
1301     vec($bits,fileno($_),1) = 1;
1302   }
1303   $bits;
1304 }
1305
1306
1307 sub Log                         # ($jobstep_id, $logmessage)
1308 {
1309   if ($_[1] =~ /\n/) {
1310     for my $line (split (/\n/, $_[1])) {
1311       Log ($_[0], $line);
1312     }
1313     return;
1314   }
1315   my $fh = select STDERR; $|=1; select $fh;
1316   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1317   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1318   $message .= "\n";
1319   my $datetime;
1320   if ($local_logfile || -t STDERR) {
1321     my @gmtime = gmtime;
1322     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1323                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1324   }
1325   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1326
1327   if ($local_logfile) {
1328     print $local_logfile $datetime . " " . $message;
1329   }
1330 }
1331
1332
1333 sub croak
1334 {
1335   my ($package, $file, $line) = caller;
1336   my $message = "@_ at $file line $line\n";
1337   Log (undef, $message);
1338   freeze() if @jobstep_todo;
1339   collate_output() if @jobstep_todo;
1340   cleanup();
1341   save_meta() if $local_logfile;
1342   die;
1343 }
1344
1345
1346 sub cleanup
1347 {
1348   return if !$job_has_uuid;
1349   $Job->update_attributes('running' => 0,
1350                           'success' => 0,
1351                           'finished_at' => scalar gmtime);
1352 }
1353
1354
1355 sub save_meta
1356 {
1357   my $justcheckpoint = shift; # false if this will be the last meta saved
1358   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1359
1360   $local_logfile->flush;
1361   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1362       . quotemeta($local_logfile->filename);
1363   my $loglocator = `$cmd`;
1364   die "system $cmd failed: $?" if $?;
1365   chomp($loglocator);
1366
1367   $local_logfile = undef;   # the temp file is automatically deleted
1368   Log (undef, "log manifest is $loglocator");
1369   $Job->{'log'} = $loglocator;
1370   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1371 }
1372
1373
1374 sub freeze_if_want_freeze
1375 {
1376   if ($main::please_freeze)
1377   {
1378     release_allocation();
1379     if (@_)
1380     {
1381       # kill some srun procs before freeze+stop
1382       map { $proc{$_} = {} } @_;
1383       while (%proc)
1384       {
1385         killem (keys %proc);
1386         select (undef, undef, undef, 0.1);
1387         my $died;
1388         while (($died = waitpid (-1, WNOHANG)) > 0)
1389         {
1390           delete $proc{$died};
1391         }
1392       }
1393     }
1394     freeze();
1395     collate_output();
1396     cleanup();
1397     save_meta();
1398     exit 0;
1399   }
1400 }
1401
1402
1403 sub freeze
1404 {
1405   Log (undef, "Freeze not implemented");
1406   return;
1407 }
1408
1409
1410 sub thaw
1411 {
1412   croak ("Thaw not implemented");
1413 }
1414
1415
1416 sub freezequote
1417 {
1418   my $s = shift;
1419   $s =~ s/\\/\\\\/g;
1420   $s =~ s/\n/\\n/g;
1421   return $s;
1422 }
1423
1424
1425 sub freezeunquote
1426 {
1427   my $s = shift;
1428   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1429   return $s;
1430 }
1431
1432
1433 sub srun
1434 {
1435   my $srunargs = shift;
1436   my $execargs = shift;
1437   my $opts = shift || {};
1438   my $stdin = shift;
1439   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1440   print STDERR (join (" ",
1441                       map { / / ? "'$_'" : $_ }
1442                       (@$args)),
1443                 "\n")
1444       if $ENV{CRUNCH_DEBUG};
1445
1446   if (defined $stdin) {
1447     my $child = open STDIN, "-|";
1448     defined $child or die "no fork: $!";
1449     if ($child == 0) {
1450       print $stdin or die $!;
1451       close STDOUT or die $!;
1452       exit 0;
1453     }
1454   }
1455
1456   return system (@$args) if $opts->{fork};
1457
1458   exec @$args;
1459   warn "ENV size is ".length(join(" ",%ENV));
1460   die "exec failed: $!: @$args";
1461 }
1462
1463
1464 sub ban_node_by_slot {
1465   # Don't start any new jobsteps on this node for 60 seconds
1466   my $slotid = shift;
1467   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1468   $slot[$slotid]->{node}->{hold_count}++;
1469   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1470 }
1471
1472 sub must_lock_now
1473 {
1474   my ($lockfile, $error_message) = @_;
1475   open L, ">", $lockfile or croak("$lockfile: $!");
1476   if (!flock L, LOCK_EX|LOCK_NB) {
1477     croak("Can't lock $lockfile: $error_message\n");
1478   }
1479 }
1480
1481 sub find_docker_image {
1482   # Given a Keep locator, check to see if it contains a Docker image.
1483   # If so, return its stream name and Docker hash.
1484   # If not, return undef for both values.
1485   my $locator = shift;
1486   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1487     my @file_list = @{$image->{files}};
1488     if ((scalar(@file_list) == 1) &&
1489         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1490       return ($file_list[0][0], $1);
1491     }
1492   }
1493   return (undef, undef);
1494 }
1495
1496 __DATA__
1497 #!/usr/bin/perl
1498
1499 # checkout-and-build
1500
1501 use Fcntl ':flock';
1502
1503 my $destdir = $ENV{"CRUNCH_SRC"};
1504 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1505 my $repo = $ENV{"CRUNCH_SRC_URL"};
1506
1507 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1508 flock L, LOCK_EX;
1509 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1510     exit 0;
1511 }
1512
1513 unlink "$destdir.commit";
1514 open STDOUT, ">", "$destdir.log";
1515 open STDERR, ">&STDOUT";
1516
1517 mkdir $destdir;
1518 my @git_archive_data = <DATA>;
1519 if (@git_archive_data) {
1520   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1521   print TARX @git_archive_data;
1522   if(!close(TARX)) {
1523     die "'tar -C $destdir -xf -' exited $?: $!";
1524   }
1525 }
1526
1527 my $pwd;
1528 chomp ($pwd = `pwd`);
1529 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1530 mkdir $install_dir;
1531
1532 for my $src_path ("$destdir/arvados/sdk/python") {
1533   if (-d $src_path) {
1534     shell_or_die ("virtualenv", $install_dir);
1535     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1536   }
1537 }
1538
1539 if (-e "$destdir/crunch_scripts/install") {
1540     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1541 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1542     # Old version
1543     shell_or_die ("./tests/autotests.sh", $install_dir);
1544 } elsif (-e "./install.sh") {
1545     shell_or_die ("./install.sh", $install_dir);
1546 }
1547
1548 if ($commit) {
1549     unlink "$destdir.commit.new";
1550     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1551     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1552 }
1553
1554 close L;
1555
1556 exit 0;
1557
1558 sub shell_or_die
1559 {
1560   if ($ENV{"DEBUG"}) {
1561     print STDERR "@_\n";
1562   }
1563   system (@_) == 0
1564       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1565 }
1566
1567 __DATA__