Merge branch 'master' into 3296-user-profile
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   my $installpid = fork();
491   if ($installpid == 0)
492   {
493     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
494     exit (1);
495   }
496   while (1)
497   {
498     last if $installpid == waitpid (-1, WNOHANG);
499     freeze_if_want_freeze ($installpid);
500     select (undef, undef, undef, 0.1);
501   }
502   Log (undef, "Install exited $?");
503 }
504
505 if (!$have_slurm)
506 {
507   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 }
510
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516   if (!$docker_hash)
517   {
518     croak("No Docker image hash found from locator $docker_locator");
519   }
520   $docker_stream =~ s/^\.//;
521   my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 fi
525 };
526   my $docker_pid = fork();
527   if ($docker_pid == 0)
528   {
529     srun (["srun", "--nodelist=" . join(',', @node)],
530           ["/bin/sh", "-ec", $docker_install_script]);
531     exit ($?);
532   }
533   while (1)
534   {
535     last if $docker_pid == waitpid (-1, WNOHANG);
536     freeze_if_want_freeze ($docker_pid);
537     select (undef, undef, undef, 0.1);
538   }
539   if ($? != 0)
540   {
541     croak("Installing Docker image from $docker_locator returned exit code $?");
542   }
543 }
544
545 foreach (qw (script script_version script_parameters runtime_constraints))
546 {
547   Log (undef,
548        "$_ " .
549        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
550 }
551 foreach (split (/\n/, $Job->{knobs}))
552 {
553   Log (undef, "knob " . $_);
554 }
555
556
557
558 $main::success = undef;
559
560
561
562 ONELEVEL:
563
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
567
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569                        or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
572
573
574
575 my %proc;
576 my @freeslot = (0..$#slot);
577 my @holdslot;
578 my %reader;
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
581
582 update_progress_stats();
583
584
585
586 THISROUND:
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
588 {
589   my $id = $jobstep_todo[$todo_ptr];
590   my $Jobstep = $jobstep[$id];
591   if ($Jobstep->{level} != $level)
592   {
593     next;
594   }
595
596   pipe $reader{$id}, "writer" or croak ($!);
597   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
599
600   my $childslot = $freeslot[0];
601   my $childnode = $slot[$childslot]->{node};
602   my $childslotname = join (".",
603                             $slot[$childslot]->{node}->{name},
604                             $slot[$childslot]->{cpu});
605   my $childpid = fork();
606   if ($childpid == 0)
607   {
608     $SIG{'INT'} = 'DEFAULT';
609     $SIG{'QUIT'} = 'DEFAULT';
610     $SIG{'TERM'} = 'DEFAULT';
611
612     foreach (values (%reader))
613     {
614       close($_);
615     }
616     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617     open(STDOUT,">&writer");
618     open(STDERR,">&writer");
619
620     undef $dbh;
621     undef $sth;
622
623     delete $ENV{"GNUPGHOME"};
624     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625     $ENV{"TASK_QSEQUENCE"} = $id;
626     $ENV{"TASK_SEQUENCE"} = $level;
627     $ENV{"JOB_SCRIPT"} = $Job->{script};
628     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629       $param =~ tr/a-z/A-Z/;
630       $ENV{"JOB_PARAMETER_$param"} = $value;
631     }
632     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635     $ENV{"HOME"} = $ENV{"TASK_WORK"};
636     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
640
641     $ENV{"GZIP"} = "-n";
642
643     my @srunargs = (
644       "srun",
645       "--nodelist=".$childnode->{name},
646       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647       "--job-name=$job_id.$id.$$",
648         );
649     my $build_script_to_send = "";
650     my $command =
651         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
653         ."&& cd $ENV{CRUNCH_TMP} ";
654     if ($build_script)
655     {
656       $build_script_to_send = $build_script;
657       $command .=
658           "&& perl -";
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673       $command .= "--env=\QHOME=/home/crunch\E ";
674       while (my ($env_key, $env_val) = each %ENV)
675       {
676         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677           if ($env_key eq "TASK_WORK") {
678             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
679           }
680           elsif ($env_key eq "TASK_KEEPMOUNT") {
681             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
682           }
683           else {
684             $command .= "--env=\Q$env_key=$env_val\E ";
685           }
686         }
687       }
688       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
689       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
690       $command .= "\Q$docker_hash\E ";
691       $command .= "stdbuf --output=0 --error=0 ";
692       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
693     } else {
694       # Non-docker run
695       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
696       $command .= "stdbuf --output=0 --error=0 ";
697       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
698     }
699
700     my @execargs = ('bash', '-c', $command);
701     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
702     # exec() failed, we assume nothing happened.
703     Log(undef, "srun() failed on build script");
704     die;
705   }
706   close("writer");
707   if (!defined $childpid)
708   {
709     close $reader{$id};
710     delete $reader{$id};
711     next;
712   }
713   shift @freeslot;
714   $proc{$childpid} = { jobstep => $id,
715                        time => time,
716                        slot => $childslot,
717                        jobstepname => "$job_id.$id.$childpid",
718                      };
719   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
720   $slot[$childslot]->{pid} = $childpid;
721
722   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
723   Log ($id, "child $childpid started on $childslotname");
724   $Jobstep->{starttime} = time;
725   $Jobstep->{node} = $childnode->{name};
726   $Jobstep->{slotindex} = $childslot;
727   delete $Jobstep->{stderr};
728   delete $Jobstep->{finishtime};
729
730   splice @jobstep_todo, $todo_ptr, 1;
731   --$todo_ptr;
732
733   $progress_is_dirty = 1;
734
735   while (!@freeslot
736          ||
737          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
738   {
739     last THISROUND if $main::please_freeze;
740     if ($main::please_info)
741     {
742       $main::please_info = 0;
743       freeze();
744       collate_output();
745       save_meta(1);
746       update_progress_stats();
747     }
748     my $gotsome
749         = readfrompipes ()
750         + reapchildren ();
751     if (!$gotsome)
752     {
753       check_refresh_wanted();
754       check_squeue();
755       update_progress_stats();
756       select (undef, undef, undef, 0.1);
757     }
758     elsif (time - $progress_stats_updated >= 30)
759     {
760       update_progress_stats();
761     }
762     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
763         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
764     {
765       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
766           .($thisround_failed+$thisround_succeeded)
767           .") -- giving up on this round";
768       Log (undef, $message);
769       last THISROUND;
770     }
771
772     # move slots from freeslot to holdslot (or back to freeslot) if necessary
773     for (my $i=$#freeslot; $i>=0; $i--) {
774       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
775         push @holdslot, (splice @freeslot, $i, 1);
776       }
777     }
778     for (my $i=$#holdslot; $i>=0; $i--) {
779       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
780         push @freeslot, (splice @holdslot, $i, 1);
781       }
782     }
783
784     # give up if no nodes are succeeding
785     if (!grep { $_->{node}->{losing_streak} == 0 &&
786                     $_->{node}->{hold_count} < 4 } @slot) {
787       my $message = "Every node has failed -- giving up on this round";
788       Log (undef, $message);
789       last THISROUND;
790     }
791   }
792 }
793
794
795 push @freeslot, splice @holdslot;
796 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
797
798
799 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
800 while (%proc)
801 {
802   if ($main::please_continue) {
803     $main::please_continue = 0;
804     goto THISROUND;
805   }
806   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
807   readfrompipes ();
808   if (!reapchildren())
809   {
810     check_refresh_wanted();
811     check_squeue();
812     update_progress_stats();
813     select (undef, undef, undef, 0.1);
814     killem (keys %proc) if $main::please_freeze;
815   }
816 }
817
818 update_progress_stats();
819 freeze_if_want_freeze();
820
821
822 if (!defined $main::success)
823 {
824   if (@jobstep_todo &&
825       $thisround_succeeded == 0 &&
826       ($thisround_failed == 0 || $thisround_failed > 4))
827   {
828     my $message = "stop because $thisround_failed tasks failed and none succeeded";
829     Log (undef, $message);
830     $main::success = 0;
831   }
832   if (!@jobstep_todo)
833   {
834     $main::success = 1;
835   }
836 }
837
838 goto ONELEVEL if !defined $main::success;
839
840
841 release_allocation();
842 freeze();
843 my $collated_output = &collate_output();
844
845 if ($job_has_uuid) {
846   $Job->update_attributes('running' => 0,
847                           'success' => $collated_output && $main::success,
848                           'finished_at' => scalar gmtime)
849 }
850
851 if (!$collated_output) {
852   Log(undef, "output undef");
853 }
854 else {
855   eval {
856     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
857         or die "failed to get collated manifest: $!";
858     # Read the original manifest, and strip permission hints from it,
859     # so we can put the result in a Collection.
860     my @stripped_manifest_lines = ();
861     my $orig_manifest_text = '';
862     while (my $manifest_line = <$orig_manifest>) {
863       $orig_manifest_text .= $manifest_line;
864       my @words = split(/ /, $manifest_line, -1);
865       foreach my $ii (0..$#words) {
866         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
867           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
868         }
869       }
870       push(@stripped_manifest_lines, join(" ", @words));
871     }
872     my $stripped_manifest_text = join("", @stripped_manifest_lines);
873     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
874       'uuid' => md5_hex($stripped_manifest_text),
875       'manifest_text' => $orig_manifest_text,
876     });
877     Log(undef, "output " . $output->{uuid});
878     $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
879     if ($Job->{'output_is_persistent'}) {
880       $arv->{'links'}->{'create'}->execute('link' => {
881         'tail_kind' => 'arvados#user',
882         'tail_uuid' => $User->{'uuid'},
883         'head_kind' => 'arvados#collection',
884         'head_uuid' => $Job->{'output'},
885         'link_class' => 'resources',
886         'name' => 'wants',
887       });
888     }
889   };
890   if ($@) {
891     Log (undef, "Failed to register output manifest: $@");
892   }
893 }
894
895 Log (undef, "finish");
896
897 save_meta();
898 exit ($Job->{'success'} ? 1 : 0);
899
900
901
902 sub update_progress_stats
903 {
904   $progress_stats_updated = time;
905   return if !$progress_is_dirty;
906   my ($todo, $done, $running) = (scalar @jobstep_todo,
907                                  scalar @jobstep_done,
908                                  scalar @slot - scalar @freeslot - scalar @holdslot);
909   $Job->{'tasks_summary'} ||= {};
910   $Job->{'tasks_summary'}->{'todo'} = $todo;
911   $Job->{'tasks_summary'}->{'done'} = $done;
912   $Job->{'tasks_summary'}->{'running'} = $running;
913   if ($job_has_uuid) {
914     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
915   }
916   Log (undef, "status: $done done, $running running, $todo todo");
917   $progress_is_dirty = 0;
918 }
919
920
921
922 sub reapchildren
923 {
924   my $pid = waitpid (-1, WNOHANG);
925   return 0 if $pid <= 0;
926
927   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
928                   . "."
929                   . $slot[$proc{$pid}->{slot}]->{cpu});
930   my $jobstepid = $proc{$pid}->{jobstep};
931   my $elapsed = time - $proc{$pid}->{time};
932   my $Jobstep = $jobstep[$jobstepid];
933
934   my $childstatus = $?;
935   my $exitvalue = $childstatus >> 8;
936   my $exitinfo = sprintf("exit %d signal %d%s",
937                          $exitvalue,
938                          $childstatus & 127,
939                          ($childstatus & 128 ? ' core dump' : ''));
940   $Jobstep->{'arvados_task'}->reload;
941   my $task_success = $Jobstep->{'arvados_task'}->{success};
942
943   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
944
945   if (!defined $task_success) {
946     # task did not indicate one way or the other --> fail
947     $Jobstep->{'arvados_task'}->{success} = 0;
948     $Jobstep->{'arvados_task'}->save;
949     $task_success = 0;
950   }
951
952   if (!$task_success)
953   {
954     my $temporary_fail;
955     $temporary_fail ||= $Jobstep->{node_fail};
956     $temporary_fail ||= ($exitvalue == 111);
957
958     ++$thisround_failed;
959     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
960
961     # Check for signs of a failed or misconfigured node
962     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
963         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
964       # Don't count this against jobstep failure thresholds if this
965       # node is already suspected faulty and srun exited quickly
966       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
967           $elapsed < 5) {
968         Log ($jobstepid, "blaming failure on suspect node " .
969              $slot[$proc{$pid}->{slot}]->{node}->{name});
970         $temporary_fail ||= 1;
971       }
972       ban_node_by_slot($proc{$pid}->{slot});
973     }
974
975     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
976                              ++$Jobstep->{'failures'},
977                              $temporary_fail ? 'temporary ' : 'permanent',
978                              $elapsed));
979
980     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
981       # Give up on this task, and the whole job
982       $main::success = 0;
983       $main::please_freeze = 1;
984     }
985     else {
986       # Put this task back on the todo queue
987       push @jobstep_todo, $jobstepid;
988     }
989     $Job->{'tasks_summary'}->{'failed'}++;
990   }
991   else
992   {
993     ++$thisround_succeeded;
994     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
995     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
996     push @jobstep_done, $jobstepid;
997     Log ($jobstepid, "success in $elapsed seconds");
998   }
999   $Jobstep->{exitcode} = $childstatus;
1000   $Jobstep->{finishtime} = time;
1001   process_stderr ($jobstepid, $task_success);
1002   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1003
1004   close $reader{$jobstepid};
1005   delete $reader{$jobstepid};
1006   delete $slot[$proc{$pid}->{slot}]->{pid};
1007   push @freeslot, $proc{$pid}->{slot};
1008   delete $proc{$pid};
1009
1010   if ($task_success) {
1011     # Load new tasks
1012     my $newtask_list = [];
1013     my $newtask_results;
1014     do {
1015       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1016         'where' => {
1017           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1018         },
1019         'order' => 'qsequence',
1020         'offset' => scalar(@$newtask_list),
1021       );
1022       push(@$newtask_list, @{$newtask_results->{items}});
1023     } while (@{$newtask_results->{items}});
1024     foreach my $arvados_task (@$newtask_list) {
1025       my $jobstep = {
1026         'level' => $arvados_task->{'sequence'},
1027         'failures' => 0,
1028         'arvados_task' => $arvados_task
1029       };
1030       push @jobstep, $jobstep;
1031       push @jobstep_todo, $#jobstep;
1032     }
1033   }
1034
1035   $progress_is_dirty = 1;
1036   1;
1037 }
1038
1039 sub check_refresh_wanted
1040 {
1041   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1042   if (@stat && $stat[9] > $latest_refresh) {
1043     $latest_refresh = scalar time;
1044     if ($job_has_uuid) {
1045       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1046       for my $attr ('cancelled_at',
1047                     'cancelled_by_user_uuid',
1048                     'cancelled_by_client_uuid') {
1049         $Job->{$attr} = $Job2->{$attr};
1050       }
1051       if ($Job->{'cancelled_at'}) {
1052         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1053              " by user " . $Job->{cancelled_by_user_uuid});
1054         $main::success = 0;
1055         $main::please_freeze = 1;
1056       }
1057     }
1058   }
1059 }
1060
1061 sub check_squeue
1062 {
1063   # return if the kill list was checked <4 seconds ago
1064   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1065   {
1066     return;
1067   }
1068   $squeue_kill_checked = time;
1069
1070   # use killem() on procs whose killtime is reached
1071   for (keys %proc)
1072   {
1073     if (exists $proc{$_}->{killtime}
1074         && $proc{$_}->{killtime} <= time)
1075     {
1076       killem ($_);
1077     }
1078   }
1079
1080   # return if the squeue was checked <60 seconds ago
1081   if (defined $squeue_checked && $squeue_checked > time - 60)
1082   {
1083     return;
1084   }
1085   $squeue_checked = time;
1086
1087   if (!$have_slurm)
1088   {
1089     # here is an opportunity to check for mysterious problems with local procs
1090     return;
1091   }
1092
1093   # get a list of steps still running
1094   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1095   chop @squeue;
1096   if ($squeue[-1] ne "ok")
1097   {
1098     return;
1099   }
1100   pop @squeue;
1101
1102   # which of my jobsteps are running, according to squeue?
1103   my %ok;
1104   foreach (@squeue)
1105   {
1106     if (/^(\d+)\.(\d+) (\S+)/)
1107     {
1108       if ($1 eq $ENV{SLURM_JOBID})
1109       {
1110         $ok{$3} = 1;
1111       }
1112     }
1113   }
1114
1115   # which of my active child procs (>60s old) were not mentioned by squeue?
1116   foreach (keys %proc)
1117   {
1118     if ($proc{$_}->{time} < time - 60
1119         && !exists $ok{$proc{$_}->{jobstepname}}
1120         && !exists $proc{$_}->{killtime})
1121     {
1122       # kill this proc if it hasn't exited in 30 seconds
1123       $proc{$_}->{killtime} = time + 30;
1124     }
1125   }
1126 }
1127
1128
1129 sub release_allocation
1130 {
1131   if ($have_slurm)
1132   {
1133     Log (undef, "release job allocation");
1134     system "scancel $ENV{SLURM_JOBID}";
1135   }
1136 }
1137
1138
1139 sub readfrompipes
1140 {
1141   my $gotsome = 0;
1142   foreach my $job (keys %reader)
1143   {
1144     my $buf;
1145     while (0 < sysread ($reader{$job}, $buf, 8192))
1146     {
1147       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1148       $jobstep[$job]->{stderr} .= $buf;
1149       preprocess_stderr ($job);
1150       if (length ($jobstep[$job]->{stderr}) > 16384)
1151       {
1152         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1153       }
1154       $gotsome = 1;
1155     }
1156   }
1157   return $gotsome;
1158 }
1159
1160
1161 sub preprocess_stderr
1162 {
1163   my $job = shift;
1164
1165   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1166     my $line = $1;
1167     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1168     Log ($job, "stderr $line");
1169     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1170       # whoa.
1171       $main::please_freeze = 1;
1172     }
1173     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1174       $jobstep[$job]->{node_fail} = 1;
1175       ban_node_by_slot($jobstep[$job]->{slotindex});
1176     }
1177   }
1178 }
1179
1180
1181 sub process_stderr
1182 {
1183   my $job = shift;
1184   my $task_success = shift;
1185   preprocess_stderr ($job);
1186
1187   map {
1188     Log ($job, "stderr $_");
1189   } split ("\n", $jobstep[$job]->{stderr});
1190 }
1191
1192 sub fetch_block
1193 {
1194   my $hash = shift;
1195   my ($keep, $child_out, $output_block);
1196
1197   my $cmd = "arv-get \Q$hash\E";
1198   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1199   $output_block = '';
1200   while (1) {
1201     my $buf;
1202     my $bytes = sysread($keep, $buf, 1024 * 1024);
1203     if (!defined $bytes) {
1204       die "reading from arv-get: $!";
1205     } elsif ($bytes == 0) {
1206       # sysread returns 0 at the end of the pipe.
1207       last;
1208     } else {
1209       # some bytes were read into buf.
1210       $output_block .= $buf;
1211     }
1212   }
1213   close $keep;
1214   return $output_block;
1215 }
1216
1217 sub collate_output
1218 {
1219   Log (undef, "collate");
1220
1221   my ($child_out, $child_in);
1222   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1223   my $joboutput;
1224   for (@jobstep)
1225   {
1226     next if (!exists $_->{'arvados_task'}->{output} ||
1227              !$_->{'arvados_task'}->{'success'} ||
1228              $_->{'exitcode'} != 0);
1229     my $output = $_->{'arvados_task'}->{output};
1230     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1231     {
1232       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1233       print $child_in $output;
1234     }
1235     elsif (@jobstep == 1)
1236     {
1237       $joboutput = $output;
1238       last;
1239     }
1240     elsif (defined (my $outblock = fetch_block ($output)))
1241     {
1242       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1243       print $child_in $outblock;
1244     }
1245     else
1246     {
1247       Log (undef, "XXX fetch_block($output) failed XXX");
1248       $main::success = 0;
1249     }
1250   }
1251   $child_in->close;
1252
1253   if (!defined $joboutput) {
1254     my $s = IO::Select->new($child_out);
1255     if ($s->can_read(120)) {
1256       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1257       chomp($joboutput);
1258     } else {
1259       Log (undef, "timed out reading from 'arv-put'");
1260     }
1261   }
1262   waitpid($pid, 0);
1263
1264   return $joboutput;
1265 }
1266
1267
1268 sub killem
1269 {
1270   foreach (@_)
1271   {
1272     my $sig = 2;                # SIGINT first
1273     if (exists $proc{$_}->{"sent_$sig"} &&
1274         time - $proc{$_}->{"sent_$sig"} > 4)
1275     {
1276       $sig = 15;                # SIGTERM if SIGINT doesn't work
1277     }
1278     if (exists $proc{$_}->{"sent_$sig"} &&
1279         time - $proc{$_}->{"sent_$sig"} > 4)
1280     {
1281       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1282     }
1283     if (!exists $proc{$_}->{"sent_$sig"})
1284     {
1285       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1286       kill $sig, $_;
1287       select (undef, undef, undef, 0.1);
1288       if ($sig == 2)
1289       {
1290         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1291       }
1292       $proc{$_}->{"sent_$sig"} = time;
1293       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1294     }
1295   }
1296 }
1297
1298
1299 sub fhbits
1300 {
1301   my($bits);
1302   for (@_) {
1303     vec($bits,fileno($_),1) = 1;
1304   }
1305   $bits;
1306 }
1307
1308
1309 sub Log                         # ($jobstep_id, $logmessage)
1310 {
1311   if ($_[1] =~ /\n/) {
1312     for my $line (split (/\n/, $_[1])) {
1313       Log ($_[0], $line);
1314     }
1315     return;
1316   }
1317   my $fh = select STDERR; $|=1; select $fh;
1318   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1319   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1320   $message .= "\n";
1321   my $datetime;
1322   if ($local_logfile || -t STDERR) {
1323     my @gmtime = gmtime;
1324     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1325                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1326   }
1327   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1328
1329   if ($local_logfile) {
1330     print $local_logfile $datetime . " " . $message;
1331   }
1332 }
1333
1334
1335 sub croak
1336 {
1337   my ($package, $file, $line) = caller;
1338   my $message = "@_ at $file line $line\n";
1339   Log (undef, $message);
1340   freeze() if @jobstep_todo;
1341   collate_output() if @jobstep_todo;
1342   cleanup();
1343   save_meta() if $local_logfile;
1344   die;
1345 }
1346
1347
1348 sub cleanup
1349 {
1350   return if !$job_has_uuid;
1351   $Job->update_attributes('running' => 0,
1352                           'success' => 0,
1353                           'finished_at' => scalar gmtime);
1354 }
1355
1356
1357 sub save_meta
1358 {
1359   my $justcheckpoint = shift; # false if this will be the last meta saved
1360   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1361
1362   $local_logfile->flush;
1363   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1364       . quotemeta($local_logfile->filename);
1365   my $loglocator = `$cmd`;
1366   die "system $cmd failed: $?" if $?;
1367   chomp($loglocator);
1368
1369   $local_logfile = undef;   # the temp file is automatically deleted
1370   Log (undef, "log manifest is $loglocator");
1371   $Job->{'log'} = $loglocator;
1372   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1373 }
1374
1375
1376 sub freeze_if_want_freeze
1377 {
1378   if ($main::please_freeze)
1379   {
1380     release_allocation();
1381     if (@_)
1382     {
1383       # kill some srun procs before freeze+stop
1384       map { $proc{$_} = {} } @_;
1385       while (%proc)
1386       {
1387         killem (keys %proc);
1388         select (undef, undef, undef, 0.1);
1389         my $died;
1390         while (($died = waitpid (-1, WNOHANG)) > 0)
1391         {
1392           delete $proc{$died};
1393         }
1394       }
1395     }
1396     freeze();
1397     collate_output();
1398     cleanup();
1399     save_meta();
1400     exit 0;
1401   }
1402 }
1403
1404
1405 sub freeze
1406 {
1407   Log (undef, "Freeze not implemented");
1408   return;
1409 }
1410
1411
1412 sub thaw
1413 {
1414   croak ("Thaw not implemented");
1415 }
1416
1417
1418 sub freezequote
1419 {
1420   my $s = shift;
1421   $s =~ s/\\/\\\\/g;
1422   $s =~ s/\n/\\n/g;
1423   return $s;
1424 }
1425
1426
1427 sub freezeunquote
1428 {
1429   my $s = shift;
1430   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1431   return $s;
1432 }
1433
1434
1435 sub srun
1436 {
1437   my $srunargs = shift;
1438   my $execargs = shift;
1439   my $opts = shift || {};
1440   my $stdin = shift;
1441   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1442   print STDERR (join (" ",
1443                       map { / / ? "'$_'" : $_ }
1444                       (@$args)),
1445                 "\n")
1446       if $ENV{CRUNCH_DEBUG};
1447
1448   if (defined $stdin) {
1449     my $child = open STDIN, "-|";
1450     defined $child or die "no fork: $!";
1451     if ($child == 0) {
1452       print $stdin or die $!;
1453       close STDOUT or die $!;
1454       exit 0;
1455     }
1456   }
1457
1458   return system (@$args) if $opts->{fork};
1459
1460   exec @$args;
1461   warn "ENV size is ".length(join(" ",%ENV));
1462   die "exec failed: $!: @$args";
1463 }
1464
1465
1466 sub ban_node_by_slot {
1467   # Don't start any new jobsteps on this node for 60 seconds
1468   my $slotid = shift;
1469   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1470   $slot[$slotid]->{node}->{hold_count}++;
1471   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1472 }
1473
1474 sub must_lock_now
1475 {
1476   my ($lockfile, $error_message) = @_;
1477   open L, ">", $lockfile or croak("$lockfile: $!");
1478   if (!flock L, LOCK_EX|LOCK_NB) {
1479     croak("Can't lock $lockfile: $error_message\n");
1480   }
1481 }
1482
1483 sub find_docker_image {
1484   # Given a Keep locator, check to see if it contains a Docker image.
1485   # If so, return its stream name and Docker hash.
1486   # If not, return undef for both values.
1487   my $locator = shift;
1488   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1489     my @file_list = @{$image->{files}};
1490     if ((scalar(@file_list) == 1) &&
1491         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1492       return ($file_list[0][0], $1);
1493     }
1494   }
1495   return (undef, undef);
1496 }
1497
1498 __DATA__
1499 #!/usr/bin/perl
1500
1501 # checkout-and-build
1502
1503 use Fcntl ':flock';
1504
1505 my $destdir = $ENV{"CRUNCH_SRC"};
1506 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1507 my $repo = $ENV{"CRUNCH_SRC_URL"};
1508
1509 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1510 flock L, LOCK_EX;
1511 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1512     exit 0;
1513 }
1514
1515 unlink "$destdir.commit";
1516 open STDOUT, ">", "$destdir.log";
1517 open STDERR, ">&STDOUT";
1518
1519 mkdir $destdir;
1520 my @git_archive_data = <DATA>;
1521 if (@git_archive_data) {
1522   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1523   print TARX @git_archive_data;
1524   if(!close(TARX)) {
1525     die "'tar -C $destdir -xf -' exited $?: $!";
1526   }
1527 }
1528
1529 my $pwd;
1530 chomp ($pwd = `pwd`);
1531 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1532 mkdir $install_dir;
1533
1534 for my $src_path ("$destdir/arvados/sdk/python") {
1535   if (-d $src_path) {
1536     shell_or_die ("virtualenv", $install_dir);
1537     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1538   }
1539 }
1540
1541 if (-e "$destdir/crunch_scripts/install") {
1542     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1543 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1544     # Old version
1545     shell_or_die ("./tests/autotests.sh", $install_dir);
1546 } elsif (-e "./install.sh") {
1547     shell_or_die ("./install.sh", $install_dir);
1548 }
1549
1550 if ($commit) {
1551     unlink "$destdir.commit.new";
1552     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1553     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1554 }
1555
1556 close L;
1557
1558 exit 0;
1559
1560 sub shell_or_die
1561 {
1562   if ($ENV{"DEBUG"}) {
1563     print STDERR "@_\n";
1564   }
1565   system (@_) == 0
1566       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1567 }
1568
1569 __DATA__