Merge branch '3660-project-editable' closes #3660
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   my $installpid = fork();
491   if ($installpid == 0)
492   {
493     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
494     exit (1);
495   }
496   while (1)
497   {
498     last if $installpid == waitpid (-1, WNOHANG);
499     freeze_if_want_freeze ($installpid);
500     select (undef, undef, undef, 0.1);
501   }
502   Log (undef, "Install exited $?");
503 }
504
505 if (!$have_slurm)
506 {
507   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 }
510
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516   if (!$docker_hash)
517   {
518     croak("No Docker image hash found from locator $docker_locator");
519   }
520   $docker_stream =~ s/^\.//;
521   my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 fi
525 };
526   my $docker_pid = fork();
527   if ($docker_pid == 0)
528   {
529     srun (["srun", "--nodelist=" . join(',', @node)],
530           ["/bin/sh", "-ec", $docker_install_script]);
531     exit ($?);
532   }
533   while (1)
534   {
535     last if $docker_pid == waitpid (-1, WNOHANG);
536     freeze_if_want_freeze ($docker_pid);
537     select (undef, undef, undef, 0.1);
538   }
539   if ($? != 0)
540   {
541     croak("Installing Docker image from $docker_locator returned exit code $?");
542   }
543 }
544
545 foreach (qw (script script_version script_parameters runtime_constraints))
546 {
547   Log (undef,
548        "$_ " .
549        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
550 }
551 foreach (split (/\n/, $Job->{knobs}))
552 {
553   Log (undef, "knob " . $_);
554 }
555
556
557
558 $main::success = undef;
559
560
561
562 ONELEVEL:
563
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
567
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569                        or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
572
573
574
575 my %proc;
576 my @freeslot = (0..$#slot);
577 my @holdslot;
578 my %reader;
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
581
582 update_progress_stats();
583
584
585
586 THISROUND:
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
588 {
589   my $id = $jobstep_todo[$todo_ptr];
590   my $Jobstep = $jobstep[$id];
591   if ($Jobstep->{level} != $level)
592   {
593     next;
594   }
595
596   pipe $reader{$id}, "writer" or croak ($!);
597   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
599
600   my $childslot = $freeslot[0];
601   my $childnode = $slot[$childslot]->{node};
602   my $childslotname = join (".",
603                             $slot[$childslot]->{node}->{name},
604                             $slot[$childslot]->{cpu});
605   my $childpid = fork();
606   if ($childpid == 0)
607   {
608     $SIG{'INT'} = 'DEFAULT';
609     $SIG{'QUIT'} = 'DEFAULT';
610     $SIG{'TERM'} = 'DEFAULT';
611
612     foreach (values (%reader))
613     {
614       close($_);
615     }
616     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617     open(STDOUT,">&writer");
618     open(STDERR,">&writer");
619
620     undef $dbh;
621     undef $sth;
622
623     delete $ENV{"GNUPGHOME"};
624     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625     $ENV{"TASK_QSEQUENCE"} = $id;
626     $ENV{"TASK_SEQUENCE"} = $level;
627     $ENV{"JOB_SCRIPT"} = $Job->{script};
628     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629       $param =~ tr/a-z/A-Z/;
630       $ENV{"JOB_PARAMETER_$param"} = $value;
631     }
632     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635     $ENV{"HOME"} = $ENV{"TASK_WORK"};
636     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
640
641     $ENV{"GZIP"} = "-n";
642
643     my @srunargs = (
644       "srun",
645       "--nodelist=".$childnode->{name},
646       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647       "--job-name=$job_id.$id.$$",
648         );
649     my $build_script_to_send = "";
650     my $command =
651         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
653         ."&& cd $ENV{CRUNCH_TMP} ";
654     if ($build_script)
655     {
656       $build_script_to_send = $build_script;
657       $command .=
658           "&& perl -";
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673       $command .= "--env=\QHOME=/home/crunch\E ";
674       while (my ($env_key, $env_val) = each %ENV)
675       {
676         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677           if ($env_key eq "TASK_KEEPMOUNT") {
678             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
679           }
680           else {
681             $command .= "--env=\Q$env_key=$env_val\E ";
682           }
683         }
684       }
685       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
686       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
687       $command .= "\Q$docker_hash\E ";
688       $command .= "stdbuf --output=0 --error=0 ";
689       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
690     } else {
691       # Non-docker run
692       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
695     }
696
697     my @execargs = ('bash', '-c', $command);
698     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
699     # exec() failed, we assume nothing happened.
700     Log(undef, "srun() failed on build script");
701     die;
702   }
703   close("writer");
704   if (!defined $childpid)
705   {
706     close $reader{$id};
707     delete $reader{$id};
708     next;
709   }
710   shift @freeslot;
711   $proc{$childpid} = { jobstep => $id,
712                        time => time,
713                        slot => $childslot,
714                        jobstepname => "$job_id.$id.$childpid",
715                      };
716   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
717   $slot[$childslot]->{pid} = $childpid;
718
719   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
720   Log ($id, "child $childpid started on $childslotname");
721   $Jobstep->{starttime} = time;
722   $Jobstep->{node} = $childnode->{name};
723   $Jobstep->{slotindex} = $childslot;
724   delete $Jobstep->{stderr};
725   delete $Jobstep->{finishtime};
726
727   splice @jobstep_todo, $todo_ptr, 1;
728   --$todo_ptr;
729
730   $progress_is_dirty = 1;
731
732   while (!@freeslot
733          ||
734          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
735   {
736     last THISROUND if $main::please_freeze;
737     if ($main::please_info)
738     {
739       $main::please_info = 0;
740       freeze();
741       collate_output();
742       save_meta(1);
743       update_progress_stats();
744     }
745     my $gotsome
746         = readfrompipes ()
747         + reapchildren ();
748     if (!$gotsome)
749     {
750       check_refresh_wanted();
751       check_squeue();
752       update_progress_stats();
753       select (undef, undef, undef, 0.1);
754     }
755     elsif (time - $progress_stats_updated >= 30)
756     {
757       update_progress_stats();
758     }
759     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
760         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
761     {
762       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
763           .($thisround_failed+$thisround_succeeded)
764           .") -- giving up on this round";
765       Log (undef, $message);
766       last THISROUND;
767     }
768
769     # move slots from freeslot to holdslot (or back to freeslot) if necessary
770     for (my $i=$#freeslot; $i>=0; $i--) {
771       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
772         push @holdslot, (splice @freeslot, $i, 1);
773       }
774     }
775     for (my $i=$#holdslot; $i>=0; $i--) {
776       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
777         push @freeslot, (splice @holdslot, $i, 1);
778       }
779     }
780
781     # give up if no nodes are succeeding
782     if (!grep { $_->{node}->{losing_streak} == 0 &&
783                     $_->{node}->{hold_count} < 4 } @slot) {
784       my $message = "Every node has failed -- giving up on this round";
785       Log (undef, $message);
786       last THISROUND;
787     }
788   }
789 }
790
791
792 push @freeslot, splice @holdslot;
793 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
794
795
796 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
797 while (%proc)
798 {
799   if ($main::please_continue) {
800     $main::please_continue = 0;
801     goto THISROUND;
802   }
803   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
804   readfrompipes ();
805   if (!reapchildren())
806   {
807     check_refresh_wanted();
808     check_squeue();
809     update_progress_stats();
810     select (undef, undef, undef, 0.1);
811     killem (keys %proc) if $main::please_freeze;
812   }
813 }
814
815 update_progress_stats();
816 freeze_if_want_freeze();
817
818
819 if (!defined $main::success)
820 {
821   if (@jobstep_todo &&
822       $thisround_succeeded == 0 &&
823       ($thisround_failed == 0 || $thisround_failed > 4))
824   {
825     my $message = "stop because $thisround_failed tasks failed and none succeeded";
826     Log (undef, $message);
827     $main::success = 0;
828   }
829   if (!@jobstep_todo)
830   {
831     $main::success = 1;
832   }
833 }
834
835 goto ONELEVEL if !defined $main::success;
836
837
838 release_allocation();
839 freeze();
840 my $collated_output = &collate_output();
841
842 if ($job_has_uuid) {
843   $Job->update_attributes('running' => 0,
844                           'success' => $collated_output && $main::success,
845                           'finished_at' => scalar gmtime)
846 }
847
848 if (!$collated_output) {
849   Log(undef, "output undef");
850 }
851 else {
852   eval {
853     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
854         or die "failed to get collated manifest: $!";
855     # Read the original manifest, and strip permission hints from it,
856     # so we can put the result in a Collection.
857     my @stripped_manifest_lines = ();
858     my $orig_manifest_text = '';
859     while (my $manifest_line = <$orig_manifest>) {
860       $orig_manifest_text .= $manifest_line;
861       my @words = split(/ /, $manifest_line, -1);
862       foreach my $ii (0..$#words) {
863         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
864           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
865         }
866       }
867       push(@stripped_manifest_lines, join(" ", @words));
868     }
869     my $stripped_manifest_text = join("", @stripped_manifest_lines);
870     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
871       'uuid' => md5_hex($stripped_manifest_text),
872       'manifest_text' => $orig_manifest_text,
873     });
874     Log(undef, "output " . $output->{uuid});
875     $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
876     if ($Job->{'output_is_persistent'}) {
877       $arv->{'links'}->{'create'}->execute('link' => {
878         'tail_kind' => 'arvados#user',
879         'tail_uuid' => $User->{'uuid'},
880         'head_kind' => 'arvados#collection',
881         'head_uuid' => $Job->{'output'},
882         'link_class' => 'resources',
883         'name' => 'wants',
884       });
885     }
886   };
887   if ($@) {
888     Log (undef, "Failed to register output manifest: $@");
889   }
890 }
891
892 Log (undef, "finish");
893
894 save_meta();
895 exit ($Job->{'success'} ? 1 : 0);
896
897
898
899 sub update_progress_stats
900 {
901   $progress_stats_updated = time;
902   return if !$progress_is_dirty;
903   my ($todo, $done, $running) = (scalar @jobstep_todo,
904                                  scalar @jobstep_done,
905                                  scalar @slot - scalar @freeslot - scalar @holdslot);
906   $Job->{'tasks_summary'} ||= {};
907   $Job->{'tasks_summary'}->{'todo'} = $todo;
908   $Job->{'tasks_summary'}->{'done'} = $done;
909   $Job->{'tasks_summary'}->{'running'} = $running;
910   if ($job_has_uuid) {
911     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
912   }
913   Log (undef, "status: $done done, $running running, $todo todo");
914   $progress_is_dirty = 0;
915 }
916
917
918
919 sub reapchildren
920 {
921   my $pid = waitpid (-1, WNOHANG);
922   return 0 if $pid <= 0;
923
924   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
925                   . "."
926                   . $slot[$proc{$pid}->{slot}]->{cpu});
927   my $jobstepid = $proc{$pid}->{jobstep};
928   my $elapsed = time - $proc{$pid}->{time};
929   my $Jobstep = $jobstep[$jobstepid];
930
931   my $childstatus = $?;
932   my $exitvalue = $childstatus >> 8;
933   my $exitinfo = sprintf("exit %d signal %d%s",
934                          $exitvalue,
935                          $childstatus & 127,
936                          ($childstatus & 128 ? ' core dump' : ''));
937   $Jobstep->{'arvados_task'}->reload;
938   my $task_success = $Jobstep->{'arvados_task'}->{success};
939
940   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
941
942   if (!defined $task_success) {
943     # task did not indicate one way or the other --> fail
944     $Jobstep->{'arvados_task'}->{success} = 0;
945     $Jobstep->{'arvados_task'}->save;
946     $task_success = 0;
947   }
948
949   if (!$task_success)
950   {
951     my $temporary_fail;
952     $temporary_fail ||= $Jobstep->{node_fail};
953     $temporary_fail ||= ($exitvalue == 111);
954
955     ++$thisround_failed;
956     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
957
958     # Check for signs of a failed or misconfigured node
959     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
960         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
961       # Don't count this against jobstep failure thresholds if this
962       # node is already suspected faulty and srun exited quickly
963       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
964           $elapsed < 5) {
965         Log ($jobstepid, "blaming failure on suspect node " .
966              $slot[$proc{$pid}->{slot}]->{node}->{name});
967         $temporary_fail ||= 1;
968       }
969       ban_node_by_slot($proc{$pid}->{slot});
970     }
971
972     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
973                              ++$Jobstep->{'failures'},
974                              $temporary_fail ? 'temporary ' : 'permanent',
975                              $elapsed));
976
977     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
978       # Give up on this task, and the whole job
979       $main::success = 0;
980       $main::please_freeze = 1;
981     }
982     else {
983       # Put this task back on the todo queue
984       push @jobstep_todo, $jobstepid;
985     }
986     $Job->{'tasks_summary'}->{'failed'}++;
987   }
988   else
989   {
990     ++$thisround_succeeded;
991     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
992     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
993     push @jobstep_done, $jobstepid;
994     Log ($jobstepid, "success in $elapsed seconds");
995   }
996   $Jobstep->{exitcode} = $childstatus;
997   $Jobstep->{finishtime} = time;
998   process_stderr ($jobstepid, $task_success);
999   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1000
1001   close $reader{$jobstepid};
1002   delete $reader{$jobstepid};
1003   delete $slot[$proc{$pid}->{slot}]->{pid};
1004   push @freeslot, $proc{$pid}->{slot};
1005   delete $proc{$pid};
1006
1007   if ($task_success) {
1008     # Load new tasks
1009     my $newtask_list = [];
1010     my $newtask_results;
1011     do {
1012       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1013         'where' => {
1014           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1015         },
1016         'order' => 'qsequence',
1017         'offset' => scalar(@$newtask_list),
1018       );
1019       push(@$newtask_list, @{$newtask_results->{items}});
1020     } while (@{$newtask_results->{items}});
1021     foreach my $arvados_task (@$newtask_list) {
1022       my $jobstep = {
1023         'level' => $arvados_task->{'sequence'},
1024         'failures' => 0,
1025         'arvados_task' => $arvados_task
1026       };
1027       push @jobstep, $jobstep;
1028       push @jobstep_todo, $#jobstep;
1029     }
1030   }
1031
1032   $progress_is_dirty = 1;
1033   1;
1034 }
1035
1036 sub check_refresh_wanted
1037 {
1038   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1039   if (@stat && $stat[9] > $latest_refresh) {
1040     $latest_refresh = scalar time;
1041     if ($job_has_uuid) {
1042       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1043       for my $attr ('cancelled_at',
1044                     'cancelled_by_user_uuid',
1045                     'cancelled_by_client_uuid') {
1046         $Job->{$attr} = $Job2->{$attr};
1047       }
1048       if ($Job->{'cancelled_at'}) {
1049         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1050              " by user " . $Job->{cancelled_by_user_uuid});
1051         $main::success = 0;
1052         $main::please_freeze = 1;
1053       }
1054     }
1055   }
1056 }
1057
1058 sub check_squeue
1059 {
1060   # return if the kill list was checked <4 seconds ago
1061   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1062   {
1063     return;
1064   }
1065   $squeue_kill_checked = time;
1066
1067   # use killem() on procs whose killtime is reached
1068   for (keys %proc)
1069   {
1070     if (exists $proc{$_}->{killtime}
1071         && $proc{$_}->{killtime} <= time)
1072     {
1073       killem ($_);
1074     }
1075   }
1076
1077   # return if the squeue was checked <60 seconds ago
1078   if (defined $squeue_checked && $squeue_checked > time - 60)
1079   {
1080     return;
1081   }
1082   $squeue_checked = time;
1083
1084   if (!$have_slurm)
1085   {
1086     # here is an opportunity to check for mysterious problems with local procs
1087     return;
1088   }
1089
1090   # get a list of steps still running
1091   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1092   chop @squeue;
1093   if ($squeue[-1] ne "ok")
1094   {
1095     return;
1096   }
1097   pop @squeue;
1098
1099   # which of my jobsteps are running, according to squeue?
1100   my %ok;
1101   foreach (@squeue)
1102   {
1103     if (/^(\d+)\.(\d+) (\S+)/)
1104     {
1105       if ($1 eq $ENV{SLURM_JOBID})
1106       {
1107         $ok{$3} = 1;
1108       }
1109     }
1110   }
1111
1112   # which of my active child procs (>60s old) were not mentioned by squeue?
1113   foreach (keys %proc)
1114   {
1115     if ($proc{$_}->{time} < time - 60
1116         && !exists $ok{$proc{$_}->{jobstepname}}
1117         && !exists $proc{$_}->{killtime})
1118     {
1119       # kill this proc if it hasn't exited in 30 seconds
1120       $proc{$_}->{killtime} = time + 30;
1121     }
1122   }
1123 }
1124
1125
1126 sub release_allocation
1127 {
1128   if ($have_slurm)
1129   {
1130     Log (undef, "release job allocation");
1131     system "scancel $ENV{SLURM_JOBID}";
1132   }
1133 }
1134
1135
1136 sub readfrompipes
1137 {
1138   my $gotsome = 0;
1139   foreach my $job (keys %reader)
1140   {
1141     my $buf;
1142     while (0 < sysread ($reader{$job}, $buf, 8192))
1143     {
1144       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1145       $jobstep[$job]->{stderr} .= $buf;
1146       preprocess_stderr ($job);
1147       if (length ($jobstep[$job]->{stderr}) > 16384)
1148       {
1149         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1150       }
1151       $gotsome = 1;
1152     }
1153   }
1154   return $gotsome;
1155 }
1156
1157
1158 sub preprocess_stderr
1159 {
1160   my $job = shift;
1161
1162   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1163     my $line = $1;
1164     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1165     Log ($job, "stderr $line");
1166     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1167       # whoa.
1168       $main::please_freeze = 1;
1169     }
1170     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1171       $jobstep[$job]->{node_fail} = 1;
1172       ban_node_by_slot($jobstep[$job]->{slotindex});
1173     }
1174   }
1175 }
1176
1177
1178 sub process_stderr
1179 {
1180   my $job = shift;
1181   my $task_success = shift;
1182   preprocess_stderr ($job);
1183
1184   map {
1185     Log ($job, "stderr $_");
1186   } split ("\n", $jobstep[$job]->{stderr});
1187 }
1188
1189 sub fetch_block
1190 {
1191   my $hash = shift;
1192   my ($keep, $child_out, $output_block);
1193
1194   my $cmd = "arv-get \Q$hash\E";
1195   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1196   $output_block = '';
1197   while (1) {
1198     my $buf;
1199     my $bytes = sysread($keep, $buf, 1024 * 1024);
1200     if (!defined $bytes) {
1201       die "reading from arv-get: $!";
1202     } elsif ($bytes == 0) {
1203       # sysread returns 0 at the end of the pipe.
1204       last;
1205     } else {
1206       # some bytes were read into buf.
1207       $output_block .= $buf;
1208     }
1209   }
1210   close $keep;
1211   return $output_block;
1212 }
1213
1214 sub collate_output
1215 {
1216   Log (undef, "collate");
1217
1218   my ($child_out, $child_in);
1219   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1220   my $joboutput;
1221   for (@jobstep)
1222   {
1223     next if (!exists $_->{'arvados_task'}->{'output'} ||
1224              !$_->{'arvados_task'}->{'success'});
1225     my $output = $_->{'arvados_task'}->{output};
1226     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1227     {
1228       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1229       print $child_in $output;
1230     }
1231     elsif (@jobstep == 1)
1232     {
1233       $joboutput = $output;
1234       last;
1235     }
1236     elsif (defined (my $outblock = fetch_block ($output)))
1237     {
1238       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1239       print $child_in $outblock;
1240     }
1241     else
1242     {
1243       Log (undef, "XXX fetch_block($output) failed XXX");
1244       $main::success = 0;
1245     }
1246   }
1247   $child_in->close;
1248
1249   if (!defined $joboutput) {
1250     my $s = IO::Select->new($child_out);
1251     if ($s->can_read(120)) {
1252       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1253       chomp($joboutput);
1254     } else {
1255       Log (undef, "timed out reading from 'arv-put'");
1256     }
1257   }
1258   waitpid($pid, 0);
1259
1260   return $joboutput;
1261 }
1262
1263
1264 sub killem
1265 {
1266   foreach (@_)
1267   {
1268     my $sig = 2;                # SIGINT first
1269     if (exists $proc{$_}->{"sent_$sig"} &&
1270         time - $proc{$_}->{"sent_$sig"} > 4)
1271     {
1272       $sig = 15;                # SIGTERM if SIGINT doesn't work
1273     }
1274     if (exists $proc{$_}->{"sent_$sig"} &&
1275         time - $proc{$_}->{"sent_$sig"} > 4)
1276     {
1277       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1278     }
1279     if (!exists $proc{$_}->{"sent_$sig"})
1280     {
1281       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1282       kill $sig, $_;
1283       select (undef, undef, undef, 0.1);
1284       if ($sig == 2)
1285       {
1286         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1287       }
1288       $proc{$_}->{"sent_$sig"} = time;
1289       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1290     }
1291   }
1292 }
1293
1294
1295 sub fhbits
1296 {
1297   my($bits);
1298   for (@_) {
1299     vec($bits,fileno($_),1) = 1;
1300   }
1301   $bits;
1302 }
1303
1304
1305 sub Log                         # ($jobstep_id, $logmessage)
1306 {
1307   if ($_[1] =~ /\n/) {
1308     for my $line (split (/\n/, $_[1])) {
1309       Log ($_[0], $line);
1310     }
1311     return;
1312   }
1313   my $fh = select STDERR; $|=1; select $fh;
1314   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1315   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1316   $message .= "\n";
1317   my $datetime;
1318   if ($local_logfile || -t STDERR) {
1319     my @gmtime = gmtime;
1320     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1321                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1322   }
1323   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1324
1325   if ($local_logfile) {
1326     print $local_logfile $datetime . " " . $message;
1327   }
1328 }
1329
1330
1331 sub croak
1332 {
1333   my ($package, $file, $line) = caller;
1334   my $message = "@_ at $file line $line\n";
1335   Log (undef, $message);
1336   freeze() if @jobstep_todo;
1337   collate_output() if @jobstep_todo;
1338   cleanup();
1339   save_meta() if $local_logfile;
1340   die;
1341 }
1342
1343
1344 sub cleanup
1345 {
1346   return if !$job_has_uuid;
1347   $Job->update_attributes('running' => 0,
1348                           'success' => 0,
1349                           'finished_at' => scalar gmtime);
1350 }
1351
1352
1353 sub save_meta
1354 {
1355   my $justcheckpoint = shift; # false if this will be the last meta saved
1356   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1357
1358   $local_logfile->flush;
1359   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1360       . quotemeta($local_logfile->filename);
1361   my $loglocator = `$cmd`;
1362   die "system $cmd failed: $?" if $?;
1363   chomp($loglocator);
1364
1365   $local_logfile = undef;   # the temp file is automatically deleted
1366   Log (undef, "log manifest is $loglocator");
1367   $Job->{'log'} = $loglocator;
1368   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1369 }
1370
1371
1372 sub freeze_if_want_freeze
1373 {
1374   if ($main::please_freeze)
1375   {
1376     release_allocation();
1377     if (@_)
1378     {
1379       # kill some srun procs before freeze+stop
1380       map { $proc{$_} = {} } @_;
1381       while (%proc)
1382       {
1383         killem (keys %proc);
1384         select (undef, undef, undef, 0.1);
1385         my $died;
1386         while (($died = waitpid (-1, WNOHANG)) > 0)
1387         {
1388           delete $proc{$died};
1389         }
1390       }
1391     }
1392     freeze();
1393     collate_output();
1394     cleanup();
1395     save_meta();
1396     exit 0;
1397   }
1398 }
1399
1400
1401 sub freeze
1402 {
1403   Log (undef, "Freeze not implemented");
1404   return;
1405 }
1406
1407
1408 sub thaw
1409 {
1410   croak ("Thaw not implemented");
1411 }
1412
1413
1414 sub freezequote
1415 {
1416   my $s = shift;
1417   $s =~ s/\\/\\\\/g;
1418   $s =~ s/\n/\\n/g;
1419   return $s;
1420 }
1421
1422
1423 sub freezeunquote
1424 {
1425   my $s = shift;
1426   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1427   return $s;
1428 }
1429
1430
1431 sub srun
1432 {
1433   my $srunargs = shift;
1434   my $execargs = shift;
1435   my $opts = shift || {};
1436   my $stdin = shift;
1437   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1438   print STDERR (join (" ",
1439                       map { / / ? "'$_'" : $_ }
1440                       (@$args)),
1441                 "\n")
1442       if $ENV{CRUNCH_DEBUG};
1443
1444   if (defined $stdin) {
1445     my $child = open STDIN, "-|";
1446     defined $child or die "no fork: $!";
1447     if ($child == 0) {
1448       print $stdin or die $!;
1449       close STDOUT or die $!;
1450       exit 0;
1451     }
1452   }
1453
1454   return system (@$args) if $opts->{fork};
1455
1456   exec @$args;
1457   warn "ENV size is ".length(join(" ",%ENV));
1458   die "exec failed: $!: @$args";
1459 }
1460
1461
1462 sub ban_node_by_slot {
1463   # Don't start any new jobsteps on this node for 60 seconds
1464   my $slotid = shift;
1465   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1466   $slot[$slotid]->{node}->{hold_count}++;
1467   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1468 }
1469
1470 sub must_lock_now
1471 {
1472   my ($lockfile, $error_message) = @_;
1473   open L, ">", $lockfile or croak("$lockfile: $!");
1474   if (!flock L, LOCK_EX|LOCK_NB) {
1475     croak("Can't lock $lockfile: $error_message\n");
1476   }
1477 }
1478
1479 sub find_docker_image {
1480   # Given a Keep locator, check to see if it contains a Docker image.
1481   # If so, return its stream name and Docker hash.
1482   # If not, return undef for both values.
1483   my $locator = shift;
1484   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1485     my @file_list = @{$image->{files}};
1486     if ((scalar(@file_list) == 1) &&
1487         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1488       return ($file_list[0][0], $1);
1489     }
1490   }
1491   return (undef, undef);
1492 }
1493
1494 __DATA__
1495 #!/usr/bin/perl
1496
1497 # checkout-and-build
1498
1499 use Fcntl ':flock';
1500 use File::Path qw( make_path );
1501
1502 my $destdir = $ENV{"CRUNCH_SRC"};
1503 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1504 my $repo = $ENV{"CRUNCH_SRC_URL"};
1505 my $task_work = $ENV{"TASK_WORK"};
1506
1507 make_path $task_work or die "Failed to create temporary working directory ($task_work): $!";
1508
1509 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1510 flock L, LOCK_EX;
1511 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1512     exit 0;
1513 }
1514
1515 unlink "$destdir.commit";
1516 open STDOUT, ">", "$destdir.log";
1517 open STDERR, ">&STDOUT";
1518
1519 mkdir $destdir;
1520 my @git_archive_data = <DATA>;
1521 if (@git_archive_data) {
1522   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1523   print TARX @git_archive_data;
1524   if(!close(TARX)) {
1525     die "'tar -C $destdir -xf -' exited $?: $!";
1526   }
1527 }
1528
1529 my $pwd;
1530 chomp ($pwd = `pwd`);
1531 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1532 mkdir $install_dir;
1533
1534 for my $src_path ("$destdir/arvados/sdk/python") {
1535   if (-d $src_path) {
1536     shell_or_die ("virtualenv", $install_dir);
1537     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1538   }
1539 }
1540
1541 if (-e "$destdir/crunch_scripts/install") {
1542     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1543 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1544     # Old version
1545     shell_or_die ("./tests/autotests.sh", $install_dir);
1546 } elsif (-e "./install.sh") {
1547     shell_or_die ("./install.sh", $install_dir);
1548 }
1549
1550 if ($commit) {
1551     unlink "$destdir.commit.new";
1552     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1553     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1554 }
1555
1556 close L;
1557
1558 exit 0;
1559
1560 sub shell_or_die
1561 {
1562   if ($ENV{"DEBUG"}) {
1563     print STDERR "@_\n";
1564   }
1565   system (@_) == 0
1566       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1567 }
1568
1569 __DATA__