Be smarter about cleanup in crunch-job after a job finishes. This should
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   my $installpid = fork();
491   if ($installpid == 0)
492   {
493     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
494     exit (1);
495   }
496   while (1)
497   {
498     last if $installpid == waitpid (-1, WNOHANG);
499     freeze_if_want_freeze ($installpid);
500     select (undef, undef, undef, 0.1);
501   }
502   Log (undef, "Install exited $?");
503 }
504
505 if (!$have_slurm)
506 {
507   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 }
510
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516   if (!$docker_hash)
517   {
518     croak("No Docker image hash found from locator $docker_locator");
519   }
520   $docker_stream =~ s/^\.//;
521   my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 fi
525 };
526   my $docker_pid = fork();
527   if ($docker_pid == 0)
528   {
529     srun (["srun", "--nodelist=" . join(',', @node)],
530           ["/bin/sh", "-ec", $docker_install_script]);
531     exit ($?);
532   }
533   while (1)
534   {
535     last if $docker_pid == waitpid (-1, WNOHANG);
536     freeze_if_want_freeze ($docker_pid);
537     select (undef, undef, undef, 0.1);
538   }
539   if ($? != 0)
540   {
541     croak("Installing Docker image from $docker_locator returned exit code $?");
542   }
543 }
544
545 foreach (qw (script script_version script_parameters runtime_constraints))
546 {
547   Log (undef,
548        "$_ " .
549        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
550 }
551 foreach (split (/\n/, $Job->{knobs}))
552 {
553   Log (undef, "knob " . $_);
554 }
555
556
557
558 $main::success = undef;
559
560
561
562 ONELEVEL:
563
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
567
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569                        or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
572
573
574
575 my %proc;
576 my @freeslot = (0..$#slot);
577 my @holdslot;
578 my %reader;
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
581
582 update_progress_stats();
583
584
585
586 THISROUND:
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
588 {
589   my $id = $jobstep_todo[$todo_ptr];
590   my $Jobstep = $jobstep[$id];
591   if ($Jobstep->{level} != $level)
592   {
593     next;
594   }
595
596   pipe $reader{$id}, "writer" or croak ($!);
597   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
599
600   my $childslot = $freeslot[0];
601   my $childnode = $slot[$childslot]->{node};
602   my $childslotname = join (".",
603                             $slot[$childslot]->{node}->{name},
604                             $slot[$childslot]->{cpu});
605   my $childpid = fork();
606   if ($childpid == 0)
607   {
608     $SIG{'INT'} = 'DEFAULT';
609     $SIG{'QUIT'} = 'DEFAULT';
610     $SIG{'TERM'} = 'DEFAULT';
611
612     foreach (values (%reader))
613     {
614       close($_);
615     }
616     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617     open(STDOUT,">&writer");
618     open(STDERR,">&writer");
619
620     undef $dbh;
621     undef $sth;
622
623     delete $ENV{"GNUPGHOME"};
624     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625     $ENV{"TASK_QSEQUENCE"} = $id;
626     $ENV{"TASK_SEQUENCE"} = $level;
627     $ENV{"JOB_SCRIPT"} = $Job->{script};
628     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629       $param =~ tr/a-z/A-Z/;
630       $ENV{"JOB_PARAMETER_$param"} = $value;
631     }
632     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635     $ENV{"HOME"} = $ENV{"TASK_WORK"};
636     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
640
641     $ENV{"GZIP"} = "-n";
642
643     my @srunargs = (
644       "srun",
645       "--nodelist=".$childnode->{name},
646       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647       "--job-name=$job_id.$id.$$",
648         );
649     my $build_script_to_send = "";
650     my $command =
651         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
653         ."&& cd $ENV{CRUNCH_TMP} ";
654     if ($build_script)
655     {
656       $build_script_to_send = $build_script;
657       $command .=
658           "&& perl -";
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673       $command .= "--env=\QHOME=/home/crunch\E ";
674       while (my ($env_key, $env_val) = each %ENV)
675       {
676         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677           if ($env_key eq "TASK_WORK") {
678             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
679           }
680           elsif ($env_key eq "TASK_KEEPMOUNT") {
681             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
682           }
683           else {
684             $command .= "--env=\Q$env_key=$env_val\E ";
685           }
686         }
687       }
688       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
689       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
690       $command .= "\Q$docker_hash\E ";
691       $command .= "stdbuf --output=0 --error=0 ";
692       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
693     } else {
694       # Non-docker run
695       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
696       $command .= "stdbuf --output=0 --error=0 ";
697       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
698     }
699
700     my @execargs = ('bash', '-c', $command);
701     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
702     # exec() failed, we assume nothing happened.
703     Log(undef, "srun() failed on build script");
704     die;
705   }
706   close("writer");
707   if (!defined $childpid)
708   {
709     close $reader{$id};
710     delete $reader{$id};
711     next;
712   }
713   shift @freeslot;
714   $proc{$childpid} = { jobstep => $id,
715                        time => time,
716                        slot => $childslot,
717                        jobstepname => "$job_id.$id.$childpid",
718                      };
719   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
720   $slot[$childslot]->{pid} = $childpid;
721
722   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
723   Log ($id, "child $childpid started on $childslotname");
724   $Jobstep->{starttime} = time;
725   $Jobstep->{node} = $childnode->{name};
726   $Jobstep->{slotindex} = $childslot;
727   delete $Jobstep->{stderr};
728   delete $Jobstep->{finishtime};
729
730   splice @jobstep_todo, $todo_ptr, 1;
731   --$todo_ptr;
732
733   $progress_is_dirty = 1;
734
735   while (!@freeslot
736          ||
737          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
738   {
739     last THISROUND if $main::please_freeze;
740     if ($main::please_info)
741     {
742       $main::please_info = 0;
743       freeze();
744       collate_output();
745       save_meta(1);
746       update_progress_stats();
747     }
748     my $gotsome
749         = readfrompipes ()
750         + reapchildren ();
751     if (!$gotsome)
752     {
753       check_refresh_wanted();
754       check_squeue();
755       update_progress_stats();
756       select (undef, undef, undef, 0.1);
757     }
758     elsif (time - $progress_stats_updated >= 30)
759     {
760       update_progress_stats();
761     }
762     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
763         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
764     {
765       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
766           .($thisround_failed+$thisround_succeeded)
767           .") -- giving up on this round";
768       Log (undef, $message);
769       last THISROUND;
770     }
771
772     # move slots from freeslot to holdslot (or back to freeslot) if necessary
773     for (my $i=$#freeslot; $i>=0; $i--) {
774       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
775         push @holdslot, (splice @freeslot, $i, 1);
776       }
777     }
778     for (my $i=$#holdslot; $i>=0; $i--) {
779       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
780         push @freeslot, (splice @holdslot, $i, 1);
781       }
782     }
783
784     # give up if no nodes are succeeding
785     if (!grep { $_->{node}->{losing_streak} == 0 &&
786                     $_->{node}->{hold_count} < 4 } @slot) {
787       my $message = "Every node has failed -- giving up on this round";
788       Log (undef, $message);
789       last THISROUND;
790     }
791   }
792 }
793
794
795 push @freeslot, splice @holdslot;
796 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
797
798
799 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
800 while (%proc)
801 {
802   if ($main::please_continue) {
803     $main::please_continue = 0;
804     goto THISROUND;
805   }
806   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
807   readfrompipes ();
808   if (!reapchildren())
809   {
810     check_refresh_wanted();
811     check_squeue();
812     update_progress_stats();
813     select (undef, undef, undef, 0.1);
814     killem (keys %proc) if $main::please_freeze;
815   }
816 }
817
818 update_progress_stats();
819 freeze_if_want_freeze();
820
821
822 if (!defined $main::success)
823 {
824   if (@jobstep_todo &&
825       $thisround_succeeded == 0 &&
826       ($thisround_failed == 0 || $thisround_failed > 4))
827   {
828     my $message = "stop because $thisround_failed tasks failed and none succeeded";
829     Log (undef, $message);
830     $main::success = 0;
831   }
832   if (!@jobstep_todo)
833   {
834     $main::success = 1;
835   }
836 }
837
838 goto ONELEVEL if !defined $main::success;
839
840
841 release_allocation();
842 freeze();
843 my $collated_output = &collate_output();
844
845 if ($job_has_uuid) {
846   $Job->update_attributes('running' => 0,
847                           'success' => $collated_output && $main::success,
848                           'finished_at' => scalar gmtime)
849 }
850
851 if (!$collated_output) {
852   Log(undef, "output undef");
853 }
854 else {
855   eval {
856     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
857         or die "failed to get collated manifest: $!";
858     # Read the original manifest, and strip permission hints from it,
859     # so we can put the result in a Collection.
860     my @stripped_manifest_lines = ();
861     my $orig_manifest_text = '';
862     while (my $manifest_line = <$orig_manifest>) {
863       $orig_manifest_text .= $manifest_line;
864       my @words = split(/ /, $manifest_line, -1);
865       foreach my $ii (0..$#words) {
866         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
867           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
868         }
869       }
870       push(@stripped_manifest_lines, join(" ", @words));
871     }
872     my $stripped_manifest_text = join("", @stripped_manifest_lines);
873     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
874       'uuid' => md5_hex($stripped_manifest_text),
875       'manifest_text' => $orig_manifest_text,
876     });
877     Log(undef, "output " . $output->{uuid});
878     $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
879     if ($Job->{'output_is_persistent'}) {
880       $arv->{'links'}->{'create'}->execute('link' => {
881         'tail_kind' => 'arvados#user',
882         'tail_uuid' => $User->{'uuid'},
883         'head_kind' => 'arvados#collection',
884         'head_uuid' => $Job->{'output'},
885         'link_class' => 'resources',
886         'name' => 'wants',
887       });
888     }
889   };
890   if ($@) {
891     Log (undef, "Failed to register output manifest: $@");
892   }
893 }
894
895 Log (undef, "finish");
896
897 save_meta();
898 exit ($Job->{'success'} ? 1 : 0);
899
900
901
902 sub update_progress_stats
903 {
904   $progress_stats_updated = time;
905   return if !$progress_is_dirty;
906   my ($todo, $done, $running) = (scalar @jobstep_todo,
907                                  scalar @jobstep_done,
908                                  scalar @slot - scalar @freeslot - scalar @holdslot);
909   $Job->{'tasks_summary'} ||= {};
910   $Job->{'tasks_summary'}->{'todo'} = $todo;
911   $Job->{'tasks_summary'}->{'done'} = $done;
912   $Job->{'tasks_summary'}->{'running'} = $running;
913   if ($job_has_uuid) {
914     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
915   }
916   Log (undef, "status: $done done, $running running, $todo todo");
917   $progress_is_dirty = 0;
918 }
919
920
921
922 sub reapchildren
923 {
924   my $pid = waitpid (-1, WNOHANG);
925   return 0 if $pid <= 0;
926
927   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
928                   . "."
929                   . $slot[$proc{$pid}->{slot}]->{cpu});
930   my $jobstepid = $proc{$pid}->{jobstep};
931   my $elapsed = time - $proc{$pid}->{time};
932   my $Jobstep = $jobstep[$jobstepid];
933
934   my $childstatus = $?;
935   my $exitvalue = $childstatus >> 8;
936   my $exitinfo = sprintf("exit %d signal %d%s",
937                          $exitvalue,
938                          $childstatus & 127,
939                          ($childstatus & 128 ? ' core dump' : ''));
940   $Jobstep->{'arvados_task'}->reload;
941   my $task_success = $Jobstep->{'arvados_task'}->{success};
942
943   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
944
945   if (!defined $task_success) {
946     # task did not indicate one way or the other --> fail
947     $Jobstep->{'arvados_task'}->{success} = 0;
948     $Jobstep->{'arvados_task'}->save;
949     $task_success = 0;
950   }
951
952   if (!$task_success)
953   {
954     my $temporary_fail;
955     $temporary_fail ||= $Jobstep->{node_fail};
956     $temporary_fail ||= ($exitvalue == 111);
957
958     ++$thisround_failed;
959     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
960
961     # Check for signs of a failed or misconfigured node
962     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
963         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
964       # Don't count this against jobstep failure thresholds if this
965       # node is already suspected faulty and srun exited quickly
966       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
967           $elapsed < 5) {
968         Log ($jobstepid, "blaming failure on suspect node " .
969              $slot[$proc{$pid}->{slot}]->{node}->{name});
970         $temporary_fail ||= 1;
971       }
972       ban_node_by_slot($proc{$pid}->{slot});
973     }
974
975     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
976                              ++$Jobstep->{'failures'},
977                              $temporary_fail ? 'temporary ' : 'permanent',
978                              $elapsed));
979
980     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
981       # Give up on this task, and the whole job
982       $main::success = 0;
983       $main::please_freeze = 1;
984     }
985     else {
986       # Put this task back on the todo queue
987       push @jobstep_todo, $jobstepid;
988     }
989     $Job->{'tasks_summary'}->{'failed'}++;
990   }
991   else
992   {
993     ++$thisround_succeeded;
994     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
995     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
996     push @jobstep_done, $jobstepid;
997     Log ($jobstepid, "success in $elapsed seconds");
998   }
999   $Jobstep->{exitcode} = $childstatus;
1000   $Jobstep->{finishtime} = time;
1001   process_stderr ($jobstepid, $task_success);
1002   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1003
1004   close $reader{$jobstepid};
1005   delete $reader{$jobstepid};
1006   delete $slot[$proc{$pid}->{slot}]->{pid};
1007   push @freeslot, $proc{$pid}->{slot};
1008   delete $proc{$pid};
1009
1010   if ($task_success) {
1011     # Load new tasks
1012     my $newtask_list = [];
1013     my $newtask_results;
1014     do {
1015       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1016         'where' => {
1017           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1018         },
1019         'order' => 'qsequence',
1020         'offset' => scalar(@$newtask_list),
1021       );
1022       push(@$newtask_list, @{$newtask_results->{items}});
1023     } while (@{$newtask_results->{items}});
1024     foreach my $arvados_task (@$newtask_list) {
1025       my $jobstep = {
1026         'level' => $arvados_task->{'sequence'},
1027         'failures' => 0,
1028         'arvados_task' => $arvados_task
1029       };
1030       push @jobstep, $jobstep;
1031       push @jobstep_todo, $#jobstep;
1032     }
1033   }
1034
1035   $progress_is_dirty = 1;
1036   1;
1037 }
1038
1039 sub check_refresh_wanted
1040 {
1041   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1042   if (@stat && $stat[9] > $latest_refresh) {
1043     $latest_refresh = scalar time;
1044     if ($job_has_uuid) {
1045       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1046       for my $attr ('cancelled_at',
1047                     'cancelled_by_user_uuid',
1048                     'cancelled_by_client_uuid') {
1049         $Job->{$attr} = $Job2->{$attr};
1050       }
1051       if ($Job->{'cancelled_at'}) {
1052         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1053              " by user " . $Job->{cancelled_by_user_uuid});
1054         $main::success = 0;
1055         $main::please_freeze = 1;
1056       }
1057     }
1058   }
1059 }
1060
1061 sub check_squeue
1062 {
1063   # return if the kill list was checked <4 seconds ago
1064   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1065   {
1066     return;
1067   }
1068   $squeue_kill_checked = time;
1069
1070   # use killem() on procs whose killtime is reached
1071   for (keys %proc)
1072   {
1073     if (exists $proc{$_}->{killtime}
1074         && $proc{$_}->{killtime} <= time)
1075     {
1076       killem ($_);
1077     }
1078   }
1079
1080   # return if the squeue was checked <60 seconds ago
1081   if (defined $squeue_checked && $squeue_checked > time - 60)
1082   {
1083     return;
1084   }
1085   $squeue_checked = time;
1086
1087   if (!$have_slurm)
1088   {
1089     # here is an opportunity to check for mysterious problems with local procs
1090     return;
1091   }
1092
1093   # get a list of steps still running
1094   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1095   chop @squeue;
1096   if ($squeue[-1] ne "ok")
1097   {
1098     return;
1099   }
1100   pop @squeue;
1101
1102   # which of my jobsteps are running, according to squeue?
1103   my %ok;
1104   foreach (@squeue)
1105   {
1106     if (/^(\d+)\.(\d+) (\S+)/)
1107     {
1108       if ($1 eq $ENV{SLURM_JOBID})
1109       {
1110         $ok{$3} = 1;
1111       }
1112     }
1113   }
1114
1115   # which of my active child procs (>60s old) were not mentioned by squeue?
1116   foreach (keys %proc)
1117   {
1118     if ($proc{$_}->{time} < time - 60
1119         && !exists $ok{$proc{$_}->{jobstepname}}
1120         && !exists $proc{$_}->{killtime})
1121     {
1122       # kill this proc if it hasn't exited in 30 seconds
1123       $proc{$_}->{killtime} = time + 30;
1124     }
1125   }
1126 }
1127
1128
1129 sub release_allocation
1130 {
1131   if ($have_slurm)
1132   {
1133     Log (undef, "release job allocation");
1134     system "scancel $ENV{SLURM_JOBID}";
1135   }
1136 }
1137
1138
1139 sub readfrompipes
1140 {
1141   my $gotsome = 0;
1142   foreach my $job (keys %reader)
1143   {
1144     my $buf;
1145     while (0 < sysread ($reader{$job}, $buf, 8192))
1146     {
1147       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1148       $jobstep[$job]->{stderr} .= $buf;
1149       preprocess_stderr ($job);
1150       if (length ($jobstep[$job]->{stderr}) > 16384)
1151       {
1152         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1153       }
1154       $gotsome = 1;
1155     }
1156   }
1157   return $gotsome;
1158 }
1159
1160
1161 sub preprocess_stderr
1162 {
1163   my $job = shift;
1164
1165   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1166     my $line = $1;
1167     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1168     Log ($job, "stderr $line");
1169     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1170       # whoa.
1171       $main::please_freeze = 1;
1172     }
1173     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1174       $jobstep[$job]->{node_fail} = 1;
1175       ban_node_by_slot($jobstep[$job]->{slotindex});
1176     }
1177   }
1178 }
1179
1180
1181 sub process_stderr
1182 {
1183   my $job = shift;
1184   my $task_success = shift;
1185   preprocess_stderr ($job);
1186
1187   map {
1188     Log ($job, "stderr $_");
1189   } split ("\n", $jobstep[$job]->{stderr});
1190 }
1191
1192 sub fetch_block
1193 {
1194   my $hash = shift;
1195   my ($keep, $child_out, $output_block);
1196
1197   my $cmd = "arv-get \Q$hash\E";
1198   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1199   $output_block = '';
1200   while (1) {
1201     my $buf;
1202     my $bytes = sysread($keep, $buf, 1024 * 1024);
1203     if (!defined $bytes) {
1204       die "reading from arv-get: $!";
1205     } elsif ($bytes == 0) {
1206       # sysread returns 0 at the end of the pipe.
1207       last;
1208     } else {
1209       # some bytes were read into buf.
1210       $output_block .= $buf;
1211     }
1212   }
1213   close $keep;
1214   return $output_block;
1215 }
1216
1217 sub collate_output
1218 {
1219   Log (undef, "collate");
1220
1221   my ($child_out, $child_in);
1222   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1223   my $joboutput;
1224   for (@jobstep)
1225   {
1226     next if (!exists $_->{'arvados_task'}->{'output'} ||
1227              !$_->{'arvados_task'}->{'success'});
1228     my $output = $_->{'arvados_task'}->{output};
1229     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1230     {
1231       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1232       print $child_in $output;
1233     }
1234     elsif (@jobstep == 1)
1235     {
1236       $joboutput = $output;
1237       last;
1238     }
1239     elsif (defined (my $outblock = fetch_block ($output)))
1240     {
1241       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1242       print $child_in $outblock;
1243     }
1244     else
1245     {
1246       Log (undef, "XXX fetch_block($output) failed XXX");
1247       $main::success = 0;
1248     }
1249   }
1250   $child_in->close;
1251
1252   if (!defined $joboutput) {
1253     my $s = IO::Select->new($child_out);
1254     if ($s->can_read(120)) {
1255       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1256       chomp($joboutput);
1257     } else {
1258       Log (undef, "timed out reading from 'arv-put'");
1259     }
1260   }
1261   waitpid($pid, 0);
1262
1263   return $joboutput;
1264 }
1265
1266
1267 sub killem
1268 {
1269   foreach (@_)
1270   {
1271     my $sig = 2;                # SIGINT first
1272     if (exists $proc{$_}->{"sent_$sig"} &&
1273         time - $proc{$_}->{"sent_$sig"} > 4)
1274     {
1275       $sig = 15;                # SIGTERM if SIGINT doesn't work
1276     }
1277     if (exists $proc{$_}->{"sent_$sig"} &&
1278         time - $proc{$_}->{"sent_$sig"} > 4)
1279     {
1280       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1281     }
1282     if (!exists $proc{$_}->{"sent_$sig"})
1283     {
1284       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1285       kill $sig, $_;
1286       select (undef, undef, undef, 0.1);
1287       if ($sig == 2)
1288       {
1289         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1290       }
1291       $proc{$_}->{"sent_$sig"} = time;
1292       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1293     }
1294   }
1295 }
1296
1297
1298 sub fhbits
1299 {
1300   my($bits);
1301   for (@_) {
1302     vec($bits,fileno($_),1) = 1;
1303   }
1304   $bits;
1305 }
1306
1307
1308 sub Log                         # ($jobstep_id, $logmessage)
1309 {
1310   if ($_[1] =~ /\n/) {
1311     for my $line (split (/\n/, $_[1])) {
1312       Log ($_[0], $line);
1313     }
1314     return;
1315   }
1316   my $fh = select STDERR; $|=1; select $fh;
1317   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1318   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1319   $message .= "\n";
1320   my $datetime;
1321   if ($local_logfile || -t STDERR) {
1322     my @gmtime = gmtime;
1323     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1324                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1325   }
1326   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1327
1328   if ($local_logfile) {
1329     print $local_logfile $datetime . " " . $message;
1330   }
1331 }
1332
1333
1334 sub croak
1335 {
1336   my ($package, $file, $line) = caller;
1337   my $message = "@_ at $file line $line\n";
1338   Log (undef, $message);
1339   freeze() if @jobstep_todo;
1340   collate_output() if @jobstep_todo;
1341   cleanup();
1342   save_meta() if $local_logfile;
1343   die;
1344 }
1345
1346
1347 sub cleanup
1348 {
1349   return if !$job_has_uuid;
1350   $Job->update_attributes('running' => 0,
1351                           'success' => 0,
1352                           'finished_at' => scalar gmtime);
1353 }
1354
1355
1356 sub save_meta
1357 {
1358   my $justcheckpoint = shift; # false if this will be the last meta saved
1359   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1360
1361   $local_logfile->flush;
1362   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1363       . quotemeta($local_logfile->filename);
1364   my $loglocator = `$cmd`;
1365   die "system $cmd failed: $?" if $?;
1366   chomp($loglocator);
1367
1368   $local_logfile = undef;   # the temp file is automatically deleted
1369   Log (undef, "log manifest is $loglocator");
1370   $Job->{'log'} = $loglocator;
1371   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1372 }
1373
1374
1375 sub freeze_if_want_freeze
1376 {
1377   if ($main::please_freeze)
1378   {
1379     release_allocation();
1380     if (@_)
1381     {
1382       # kill some srun procs before freeze+stop
1383       map { $proc{$_} = {} } @_;
1384       while (%proc)
1385       {
1386         killem (keys %proc);
1387         select (undef, undef, undef, 0.1);
1388         my $died;
1389         while (($died = waitpid (-1, WNOHANG)) > 0)
1390         {
1391           delete $proc{$died};
1392         }
1393       }
1394     }
1395     freeze();
1396     collate_output();
1397     cleanup();
1398     save_meta();
1399     exit 0;
1400   }
1401 }
1402
1403
1404 sub freeze
1405 {
1406   Log (undef, "Freeze not implemented");
1407   return;
1408 }
1409
1410
1411 sub thaw
1412 {
1413   croak ("Thaw not implemented");
1414 }
1415
1416
1417 sub freezequote
1418 {
1419   my $s = shift;
1420   $s =~ s/\\/\\\\/g;
1421   $s =~ s/\n/\\n/g;
1422   return $s;
1423 }
1424
1425
1426 sub freezeunquote
1427 {
1428   my $s = shift;
1429   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1430   return $s;
1431 }
1432
1433
1434 sub srun
1435 {
1436   my $srunargs = shift;
1437   my $execargs = shift;
1438   my $opts = shift || {};
1439   my $stdin = shift;
1440   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1441   print STDERR (join (" ",
1442                       map { / / ? "'$_'" : $_ }
1443                       (@$args)),
1444                 "\n")
1445       if $ENV{CRUNCH_DEBUG};
1446
1447   if (defined $stdin) {
1448     my $child = open STDIN, "-|";
1449     defined $child or die "no fork: $!";
1450     if ($child == 0) {
1451       print $stdin or die $!;
1452       close STDOUT or die $!;
1453       exit 0;
1454     }
1455   }
1456
1457   return system (@$args) if $opts->{fork};
1458
1459   exec @$args;
1460   warn "ENV size is ".length(join(" ",%ENV));
1461   die "exec failed: $!: @$args";
1462 }
1463
1464
1465 sub ban_node_by_slot {
1466   # Don't start any new jobsteps on this node for 60 seconds
1467   my $slotid = shift;
1468   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1469   $slot[$slotid]->{node}->{hold_count}++;
1470   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1471 }
1472
1473 sub must_lock_now
1474 {
1475   my ($lockfile, $error_message) = @_;
1476   open L, ">", $lockfile or croak("$lockfile: $!");
1477   if (!flock L, LOCK_EX|LOCK_NB) {
1478     croak("Can't lock $lockfile: $error_message\n");
1479   }
1480 }
1481
1482 sub find_docker_image {
1483   # Given a Keep locator, check to see if it contains a Docker image.
1484   # If so, return its stream name and Docker hash.
1485   # If not, return undef for both values.
1486   my $locator = shift;
1487   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1488     my @file_list = @{$image->{files}};
1489     if ((scalar(@file_list) == 1) &&
1490         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1491       return ($file_list[0][0], $1);
1492     }
1493   }
1494   return (undef, undef);
1495 }
1496
1497 __DATA__
1498 #!/usr/bin/perl
1499
1500 # checkout-and-build
1501
1502 use Fcntl ':flock';
1503
1504 my $destdir = $ENV{"CRUNCH_SRC"};
1505 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1506 my $repo = $ENV{"CRUNCH_SRC_URL"};
1507
1508 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1509 flock L, LOCK_EX;
1510 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1511     exit 0;
1512 }
1513
1514 unlink "$destdir.commit";
1515 open STDOUT, ">", "$destdir.log";
1516 open STDERR, ">&STDOUT";
1517
1518 mkdir $destdir;
1519 my @git_archive_data = <DATA>;
1520 if (@git_archive_data) {
1521   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1522   print TARX @git_archive_data;
1523   if(!close(TARX)) {
1524     die "'tar -C $destdir -xf -' exited $?: $!";
1525   }
1526 }
1527
1528 my $pwd;
1529 chomp ($pwd = `pwd`);
1530 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1531 mkdir $install_dir;
1532
1533 for my $src_path ("$destdir/arvados/sdk/python") {
1534   if (-d $src_path) {
1535     shell_or_die ("virtualenv", $install_dir);
1536     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1537   }
1538 }
1539
1540 if (-e "$destdir/crunch_scripts/install") {
1541     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1542 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1543     # Old version
1544     shell_or_die ("./tests/autotests.sh", $install_dir);
1545 } elsif (-e "./install.sh") {
1546     shell_or_die ("./install.sh", $install_dir);
1547 }
1548
1549 if ($commit) {
1550     unlink "$destdir.commit.new";
1551     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1552     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1553 }
1554
1555 close L;
1556
1557 exit 0;
1558
1559 sub shell_or_die
1560 {
1561   if ($ENV{"DEBUG"}) {
1562     print STDERR "@_\n";
1563   }
1564   system (@_) == 0
1565       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1566 }
1567
1568 __DATA__