]> git.arvados.org - arvados.git/blob - sdk/cli/bin/crunch-job
Run the task setup script inside the container (if any), instead of
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   # Note: this section is almost certainly unnecessary if we're
491   # running tasks in docker containers.
492   my $installpid = fork();
493   if ($installpid == 0)
494   {
495     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
496     exit (1);
497   }
498   while (1)
499   {
500     last if $installpid == waitpid (-1, WNOHANG);
501     freeze_if_want_freeze ($installpid);
502     select (undef, undef, undef, 0.1);
503   }
504   Log (undef, "Install exited $?");
505 }
506
507 if (!$have_slurm)
508 {
509   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
510   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
511 }
512
513 # If this job requires a Docker image, install that.
514 my $docker_bin = "/usr/bin/docker.io";
515 my ($docker_locator, $docker_stream, $docker_hash);
516 if ($docker_locator = $Job->{docker_image_locator}) {
517   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
518   if (!$docker_hash)
519   {
520     croak("No Docker image hash found from locator $docker_locator");
521   }
522   $docker_stream =~ s/^\.//;
523   my $docker_install_script = qq{
524 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
525     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
526 fi
527 };
528   my $docker_pid = fork();
529   if ($docker_pid == 0)
530   {
531     srun (["srun", "--nodelist=" . join(',', @node)],
532           ["/bin/sh", "-ec", $docker_install_script]);
533     exit ($?);
534   }
535   while (1)
536   {
537     last if $docker_pid == waitpid (-1, WNOHANG);
538     freeze_if_want_freeze ($docker_pid);
539     select (undef, undef, undef, 0.1);
540   }
541   if ($? != 0)
542   {
543     croak("Installing Docker image from $docker_locator returned exit code $?");
544   }
545 }
546
547 foreach (qw (script script_version script_parameters runtime_constraints))
548 {
549   Log (undef,
550        "$_ " .
551        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
552 }
553 foreach (split (/\n/, $Job->{knobs}))
554 {
555   Log (undef, "knob " . $_);
556 }
557
558
559
560 $main::success = undef;
561
562
563
564 ONELEVEL:
565
566 my $thisround_succeeded = 0;
567 my $thisround_failed = 0;
568 my $thisround_failed_multiple = 0;
569
570 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
571                        or $a <=> $b } @jobstep_todo;
572 my $level = $jobstep[$jobstep_todo[0]]->{level};
573 Log (undef, "start level $level");
574
575
576
577 my %proc;
578 my @freeslot = (0..$#slot);
579 my @holdslot;
580 my %reader;
581 my $progress_is_dirty = 1;
582 my $progress_stats_updated = 0;
583
584 update_progress_stats();
585
586
587
588 THISROUND:
589 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
590 {
591   my $id = $jobstep_todo[$todo_ptr];
592   my $Jobstep = $jobstep[$id];
593   if ($Jobstep->{level} != $level)
594   {
595     next;
596   }
597
598   pipe $reader{$id}, "writer" or croak ($!);
599   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
600   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
601
602   my $childslot = $freeslot[0];
603   my $childnode = $slot[$childslot]->{node};
604   my $childslotname = join (".",
605                             $slot[$childslot]->{node}->{name},
606                             $slot[$childslot]->{cpu});
607   my $childpid = fork();
608   if ($childpid == 0)
609   {
610     $SIG{'INT'} = 'DEFAULT';
611     $SIG{'QUIT'} = 'DEFAULT';
612     $SIG{'TERM'} = 'DEFAULT';
613
614     foreach (values (%reader))
615     {
616       close($_);
617     }
618     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
619     open(STDOUT,">&writer");
620     open(STDERR,">&writer");
621
622     undef $dbh;
623     undef $sth;
624
625     delete $ENV{"GNUPGHOME"};
626     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
627     $ENV{"TASK_QSEQUENCE"} = $id;
628     $ENV{"TASK_SEQUENCE"} = $level;
629     $ENV{"JOB_SCRIPT"} = $Job->{script};
630     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
631       $param =~ tr/a-z/A-Z/;
632       $ENV{"JOB_PARAMETER_$param"} = $value;
633     }
634     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
635     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
636     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
637     $ENV{"HOME"} = $ENV{"TASK_WORK"};
638     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
639     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
640     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
641     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
642
643     $ENV{"GZIP"} = "-n";
644
645     my @srunargs = (
646       "srun",
647       "--nodelist=".$childnode->{name},
648       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
649       "--job-name=$job_id.$id.$$",
650         );
651     my $build_script_to_send = "";
652     my $command =
653         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
654         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
655         ."&& cd $ENV{CRUNCH_TMP} ";
656     if ($build_script)
657     {
658       $build_script_to_send = $build_script;
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
672       $command .= "--env=\QHOME=/home/crunch\E ";
673       while (my ($env_key, $env_val) = each %ENV)
674       {
675         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
676           if ($env_key eq "TASK_KEEPMOUNT") {
677             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
678           }
679           else {
680             $command .= "--env=\Q$env_key=$env_val\E ";
681           }
682         }
683       }
684       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
685       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
686       $command .= "\Q$docker_hash\E ";
687       $command .= "stdbuf --output=0 --error=0 ";
688       $command .= "perl - ";
689       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
690     } else {
691       # Non-docker run
692       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "perl - ";
695       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
696     }
697
698     my @execargs = ('bash', '-c', $command);
699     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
700     # exec() failed, we assume nothing happened.
701     Log(undef, "srun() failed on build script");
702     die;
703   }
704   close("writer");
705   if (!defined $childpid)
706   {
707     close $reader{$id};
708     delete $reader{$id};
709     next;
710   }
711   shift @freeslot;
712   $proc{$childpid} = { jobstep => $id,
713                        time => time,
714                        slot => $childslot,
715                        jobstepname => "$job_id.$id.$childpid",
716                      };
717   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
718   $slot[$childslot]->{pid} = $childpid;
719
720   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
721   Log ($id, "child $childpid started on $childslotname");
722   $Jobstep->{starttime} = time;
723   $Jobstep->{node} = $childnode->{name};
724   $Jobstep->{slotindex} = $childslot;
725   delete $Jobstep->{stderr};
726   delete $Jobstep->{finishtime};
727
728   splice @jobstep_todo, $todo_ptr, 1;
729   --$todo_ptr;
730
731   $progress_is_dirty = 1;
732
733   while (!@freeslot
734          ||
735          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
736   {
737     last THISROUND if $main::please_freeze;
738     if ($main::please_info)
739     {
740       $main::please_info = 0;
741       freeze();
742       collate_output();
743       save_meta(1);
744       update_progress_stats();
745     }
746     my $gotsome
747         = readfrompipes ()
748         + reapchildren ();
749     if (!$gotsome)
750     {
751       check_refresh_wanted();
752       check_squeue();
753       update_progress_stats();
754       select (undef, undef, undef, 0.1);
755     }
756     elsif (time - $progress_stats_updated >= 30)
757     {
758       update_progress_stats();
759     }
760     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
761         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
762     {
763       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
764           .($thisround_failed+$thisround_succeeded)
765           .") -- giving up on this round";
766       Log (undef, $message);
767       last THISROUND;
768     }
769
770     # move slots from freeslot to holdslot (or back to freeslot) if necessary
771     for (my $i=$#freeslot; $i>=0; $i--) {
772       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
773         push @holdslot, (splice @freeslot, $i, 1);
774       }
775     }
776     for (my $i=$#holdslot; $i>=0; $i--) {
777       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
778         push @freeslot, (splice @holdslot, $i, 1);
779       }
780     }
781
782     # give up if no nodes are succeeding
783     if (!grep { $_->{node}->{losing_streak} == 0 &&
784                     $_->{node}->{hold_count} < 4 } @slot) {
785       my $message = "Every node has failed -- giving up on this round";
786       Log (undef, $message);
787       last THISROUND;
788     }
789   }
790 }
791
792
793 push @freeslot, splice @holdslot;
794 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
795
796
797 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
798 while (%proc)
799 {
800   if ($main::please_continue) {
801     $main::please_continue = 0;
802     goto THISROUND;
803   }
804   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
805   readfrompipes ();
806   if (!reapchildren())
807   {
808     check_refresh_wanted();
809     check_squeue();
810     update_progress_stats();
811     select (undef, undef, undef, 0.1);
812     killem (keys %proc) if $main::please_freeze;
813   }
814 }
815
816 update_progress_stats();
817 freeze_if_want_freeze();
818
819
820 if (!defined $main::success)
821 {
822   if (@jobstep_todo &&
823       $thisround_succeeded == 0 &&
824       ($thisround_failed == 0 || $thisround_failed > 4))
825   {
826     my $message = "stop because $thisround_failed tasks failed and none succeeded";
827     Log (undef, $message);
828     $main::success = 0;
829   }
830   if (!@jobstep_todo)
831   {
832     $main::success = 1;
833   }
834 }
835
836 goto ONELEVEL if !defined $main::success;
837
838
839 release_allocation();
840 freeze();
841 my $collated_output = &collate_output();
842
843 if ($job_has_uuid) {
844   $Job->update_attributes('running' => 0,
845                           'success' => $collated_output && $main::success,
846                           'finished_at' => scalar gmtime)
847 }
848
849 if (!$collated_output) {
850   Log(undef, "output undef");
851 }
852 else {
853   eval {
854     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
855         or die "failed to get collated manifest: $!";
856     # Read the original manifest, and strip permission hints from it,
857     # so we can put the result in a Collection.
858     my @stripped_manifest_lines = ();
859     my $orig_manifest_text = '';
860     while (my $manifest_line = <$orig_manifest>) {
861       $orig_manifest_text .= $manifest_line;
862       my @words = split(/ /, $manifest_line, -1);
863       foreach my $ii (0..$#words) {
864         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
865           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
866         }
867       }
868       push(@stripped_manifest_lines, join(" ", @words));
869     }
870     my $stripped_manifest_text = join("", @stripped_manifest_lines);
871     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
872       'uuid' => md5_hex($stripped_manifest_text),
873       'manifest_text' => $orig_manifest_text,
874     });
875     Log(undef, "output " . $output->{uuid});
876     $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
877     if ($Job->{'output_is_persistent'}) {
878       $arv->{'links'}->{'create'}->execute('link' => {
879         'tail_kind' => 'arvados#user',
880         'tail_uuid' => $User->{'uuid'},
881         'head_kind' => 'arvados#collection',
882         'head_uuid' => $Job->{'output'},
883         'link_class' => 'resources',
884         'name' => 'wants',
885       });
886     }
887   };
888   if ($@) {
889     Log (undef, "Failed to register output manifest: $@");
890   }
891 }
892
893 Log (undef, "finish");
894
895 save_meta();
896 exit ($Job->{'success'} ? 1 : 0);
897
898
899
900 sub update_progress_stats
901 {
902   $progress_stats_updated = time;
903   return if !$progress_is_dirty;
904   my ($todo, $done, $running) = (scalar @jobstep_todo,
905                                  scalar @jobstep_done,
906                                  scalar @slot - scalar @freeslot - scalar @holdslot);
907   $Job->{'tasks_summary'} ||= {};
908   $Job->{'tasks_summary'}->{'todo'} = $todo;
909   $Job->{'tasks_summary'}->{'done'} = $done;
910   $Job->{'tasks_summary'}->{'running'} = $running;
911   if ($job_has_uuid) {
912     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
913   }
914   Log (undef, "status: $done done, $running running, $todo todo");
915   $progress_is_dirty = 0;
916 }
917
918
919
920 sub reapchildren
921 {
922   my $pid = waitpid (-1, WNOHANG);
923   return 0 if $pid <= 0;
924
925   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
926                   . "."
927                   . $slot[$proc{$pid}->{slot}]->{cpu});
928   my $jobstepid = $proc{$pid}->{jobstep};
929   my $elapsed = time - $proc{$pid}->{time};
930   my $Jobstep = $jobstep[$jobstepid];
931
932   my $childstatus = $?;
933   my $exitvalue = $childstatus >> 8;
934   my $exitinfo = sprintf("exit %d signal %d%s",
935                          $exitvalue,
936                          $childstatus & 127,
937                          ($childstatus & 128 ? ' core dump' : ''));
938   $Jobstep->{'arvados_task'}->reload;
939   my $task_success = $Jobstep->{'arvados_task'}->{success};
940
941   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
942
943   if (!defined $task_success) {
944     # task did not indicate one way or the other --> fail
945     $Jobstep->{'arvados_task'}->{success} = 0;
946     $Jobstep->{'arvados_task'}->save;
947     $task_success = 0;
948   }
949
950   if (!$task_success)
951   {
952     my $temporary_fail;
953     $temporary_fail ||= $Jobstep->{node_fail};
954     $temporary_fail ||= ($exitvalue == 111);
955
956     ++$thisround_failed;
957     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
958
959     # Check for signs of a failed or misconfigured node
960     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
961         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
962       # Don't count this against jobstep failure thresholds if this
963       # node is already suspected faulty and srun exited quickly
964       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
965           $elapsed < 5) {
966         Log ($jobstepid, "blaming failure on suspect node " .
967              $slot[$proc{$pid}->{slot}]->{node}->{name});
968         $temporary_fail ||= 1;
969       }
970       ban_node_by_slot($proc{$pid}->{slot});
971     }
972
973     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
974                              ++$Jobstep->{'failures'},
975                              $temporary_fail ? 'temporary ' : 'permanent',
976                              $elapsed));
977
978     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
979       # Give up on this task, and the whole job
980       $main::success = 0;
981       $main::please_freeze = 1;
982     }
983     else {
984       # Put this task back on the todo queue
985       push @jobstep_todo, $jobstepid;
986     }
987     $Job->{'tasks_summary'}->{'failed'}++;
988   }
989   else
990   {
991     ++$thisround_succeeded;
992     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
993     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
994     push @jobstep_done, $jobstepid;
995     Log ($jobstepid, "success in $elapsed seconds");
996   }
997   $Jobstep->{exitcode} = $childstatus;
998   $Jobstep->{finishtime} = time;
999   process_stderr ($jobstepid, $task_success);
1000   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1001
1002   close $reader{$jobstepid};
1003   delete $reader{$jobstepid};
1004   delete $slot[$proc{$pid}->{slot}]->{pid};
1005   push @freeslot, $proc{$pid}->{slot};
1006   delete $proc{$pid};
1007
1008   if ($task_success) {
1009     # Load new tasks
1010     my $newtask_list = [];
1011     my $newtask_results;
1012     do {
1013       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1014         'where' => {
1015           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1016         },
1017         'order' => 'qsequence',
1018         'offset' => scalar(@$newtask_list),
1019       );
1020       push(@$newtask_list, @{$newtask_results->{items}});
1021     } while (@{$newtask_results->{items}});
1022     foreach my $arvados_task (@$newtask_list) {
1023       my $jobstep = {
1024         'level' => $arvados_task->{'sequence'},
1025         'failures' => 0,
1026         'arvados_task' => $arvados_task
1027       };
1028       push @jobstep, $jobstep;
1029       push @jobstep_todo, $#jobstep;
1030     }
1031   }
1032
1033   $progress_is_dirty = 1;
1034   1;
1035 }
1036
1037 sub check_refresh_wanted
1038 {
1039   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1040   if (@stat && $stat[9] > $latest_refresh) {
1041     $latest_refresh = scalar time;
1042     if ($job_has_uuid) {
1043       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1044       for my $attr ('cancelled_at',
1045                     'cancelled_by_user_uuid',
1046                     'cancelled_by_client_uuid') {
1047         $Job->{$attr} = $Job2->{$attr};
1048       }
1049       if ($Job->{'cancelled_at'}) {
1050         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1051              " by user " . $Job->{cancelled_by_user_uuid});
1052         $main::success = 0;
1053         $main::please_freeze = 1;
1054       }
1055     }
1056   }
1057 }
1058
1059 sub check_squeue
1060 {
1061   # return if the kill list was checked <4 seconds ago
1062   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1063   {
1064     return;
1065   }
1066   $squeue_kill_checked = time;
1067
1068   # use killem() on procs whose killtime is reached
1069   for (keys %proc)
1070   {
1071     if (exists $proc{$_}->{killtime}
1072         && $proc{$_}->{killtime} <= time)
1073     {
1074       killem ($_);
1075     }
1076   }
1077
1078   # return if the squeue was checked <60 seconds ago
1079   if (defined $squeue_checked && $squeue_checked > time - 60)
1080   {
1081     return;
1082   }
1083   $squeue_checked = time;
1084
1085   if (!$have_slurm)
1086   {
1087     # here is an opportunity to check for mysterious problems with local procs
1088     return;
1089   }
1090
1091   # get a list of steps still running
1092   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1093   chop @squeue;
1094   if ($squeue[-1] ne "ok")
1095   {
1096     return;
1097   }
1098   pop @squeue;
1099
1100   # which of my jobsteps are running, according to squeue?
1101   my %ok;
1102   foreach (@squeue)
1103   {
1104     if (/^(\d+)\.(\d+) (\S+)/)
1105     {
1106       if ($1 eq $ENV{SLURM_JOBID})
1107       {
1108         $ok{$3} = 1;
1109       }
1110     }
1111   }
1112
1113   # which of my active child procs (>60s old) were not mentioned by squeue?
1114   foreach (keys %proc)
1115   {
1116     if ($proc{$_}->{time} < time - 60
1117         && !exists $ok{$proc{$_}->{jobstepname}}
1118         && !exists $proc{$_}->{killtime})
1119     {
1120       # kill this proc if it hasn't exited in 30 seconds
1121       $proc{$_}->{killtime} = time + 30;
1122     }
1123   }
1124 }
1125
1126
1127 sub release_allocation
1128 {
1129   if ($have_slurm)
1130   {
1131     Log (undef, "release job allocation");
1132     system "scancel $ENV{SLURM_JOBID}";
1133   }
1134 }
1135
1136
1137 sub readfrompipes
1138 {
1139   my $gotsome = 0;
1140   foreach my $job (keys %reader)
1141   {
1142     my $buf;
1143     while (0 < sysread ($reader{$job}, $buf, 8192))
1144     {
1145       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1146       $jobstep[$job]->{stderr} .= $buf;
1147       preprocess_stderr ($job);
1148       if (length ($jobstep[$job]->{stderr}) > 16384)
1149       {
1150         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1151       }
1152       $gotsome = 1;
1153     }
1154   }
1155   return $gotsome;
1156 }
1157
1158
1159 sub preprocess_stderr
1160 {
1161   my $job = shift;
1162
1163   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1164     my $line = $1;
1165     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1166     Log ($job, "stderr $line");
1167     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1168       # whoa.
1169       $main::please_freeze = 1;
1170     }
1171     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1172       $jobstep[$job]->{node_fail} = 1;
1173       ban_node_by_slot($jobstep[$job]->{slotindex});
1174     }
1175   }
1176 }
1177
1178
1179 sub process_stderr
1180 {
1181   my $job = shift;
1182   my $task_success = shift;
1183   preprocess_stderr ($job);
1184
1185   map {
1186     Log ($job, "stderr $_");
1187   } split ("\n", $jobstep[$job]->{stderr});
1188 }
1189
1190 sub fetch_block
1191 {
1192   my $hash = shift;
1193   my ($keep, $child_out, $output_block);
1194
1195   my $cmd = "arv-get \Q$hash\E";
1196   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1197   $output_block = '';
1198   while (1) {
1199     my $buf;
1200     my $bytes = sysread($keep, $buf, 1024 * 1024);
1201     if (!defined $bytes) {
1202       die "reading from arv-get: $!";
1203     } elsif ($bytes == 0) {
1204       # sysread returns 0 at the end of the pipe.
1205       last;
1206     } else {
1207       # some bytes were read into buf.
1208       $output_block .= $buf;
1209     }
1210   }
1211   close $keep;
1212   return $output_block;
1213 }
1214
1215 sub collate_output
1216 {
1217   Log (undef, "collate");
1218
1219   my ($child_out, $child_in);
1220   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1221   my $joboutput;
1222   for (@jobstep)
1223   {
1224     next if (!exists $_->{'arvados_task'}->{'output'} ||
1225              !$_->{'arvados_task'}->{'success'});
1226     my $output = $_->{'arvados_task'}->{output};
1227     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1228     {
1229       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1230       print $child_in $output;
1231     }
1232     elsif (@jobstep == 1)
1233     {
1234       $joboutput = $output;
1235       last;
1236     }
1237     elsif (defined (my $outblock = fetch_block ($output)))
1238     {
1239       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1240       print $child_in $outblock;
1241     }
1242     else
1243     {
1244       Log (undef, "XXX fetch_block($output) failed XXX");
1245       $main::success = 0;
1246     }
1247   }
1248   $child_in->close;
1249
1250   if (!defined $joboutput) {
1251     my $s = IO::Select->new($child_out);
1252     if ($s->can_read(120)) {
1253       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1254       chomp($joboutput);
1255     } else {
1256       Log (undef, "timed out reading from 'arv-put'");
1257     }
1258   }
1259   waitpid($pid, 0);
1260
1261   return $joboutput;
1262 }
1263
1264
1265 sub killem
1266 {
1267   foreach (@_)
1268   {
1269     my $sig = 2;                # SIGINT first
1270     if (exists $proc{$_}->{"sent_$sig"} &&
1271         time - $proc{$_}->{"sent_$sig"} > 4)
1272     {
1273       $sig = 15;                # SIGTERM if SIGINT doesn't work
1274     }
1275     if (exists $proc{$_}->{"sent_$sig"} &&
1276         time - $proc{$_}->{"sent_$sig"} > 4)
1277     {
1278       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1279     }
1280     if (!exists $proc{$_}->{"sent_$sig"})
1281     {
1282       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1283       kill $sig, $_;
1284       select (undef, undef, undef, 0.1);
1285       if ($sig == 2)
1286       {
1287         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1288       }
1289       $proc{$_}->{"sent_$sig"} = time;
1290       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1291     }
1292   }
1293 }
1294
1295
1296 sub fhbits
1297 {
1298   my($bits);
1299   for (@_) {
1300     vec($bits,fileno($_),1) = 1;
1301   }
1302   $bits;
1303 }
1304
1305
1306 sub Log                         # ($jobstep_id, $logmessage)
1307 {
1308   if ($_[1] =~ /\n/) {
1309     for my $line (split (/\n/, $_[1])) {
1310       Log ($_[0], $line);
1311     }
1312     return;
1313   }
1314   my $fh = select STDERR; $|=1; select $fh;
1315   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1316   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1317   $message .= "\n";
1318   my $datetime;
1319   if ($local_logfile || -t STDERR) {
1320     my @gmtime = gmtime;
1321     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1322                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1323   }
1324   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1325
1326   if ($local_logfile) {
1327     print $local_logfile $datetime . " " . $message;
1328   }
1329 }
1330
1331
1332 sub croak
1333 {
1334   my ($package, $file, $line) = caller;
1335   my $message = "@_ at $file line $line\n";
1336   Log (undef, $message);
1337   freeze() if @jobstep_todo;
1338   collate_output() if @jobstep_todo;
1339   cleanup();
1340   save_meta() if $local_logfile;
1341   die;
1342 }
1343
1344
1345 sub cleanup
1346 {
1347   return if !$job_has_uuid;
1348   $Job->update_attributes('running' => 0,
1349                           'success' => 0,
1350                           'finished_at' => scalar gmtime);
1351 }
1352
1353
1354 sub save_meta
1355 {
1356   my $justcheckpoint = shift; # false if this will be the last meta saved
1357   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1358
1359   $local_logfile->flush;
1360   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1361       . quotemeta($local_logfile->filename);
1362   my $loglocator = `$cmd`;
1363   die "system $cmd failed: $?" if $?;
1364   chomp($loglocator);
1365
1366   $local_logfile = undef;   # the temp file is automatically deleted
1367   Log (undef, "log manifest is $loglocator");
1368   $Job->{'log'} = $loglocator;
1369   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1370 }
1371
1372
1373 sub freeze_if_want_freeze
1374 {
1375   if ($main::please_freeze)
1376   {
1377     release_allocation();
1378     if (@_)
1379     {
1380       # kill some srun procs before freeze+stop
1381       map { $proc{$_} = {} } @_;
1382       while (%proc)
1383       {
1384         killem (keys %proc);
1385         select (undef, undef, undef, 0.1);
1386         my $died;
1387         while (($died = waitpid (-1, WNOHANG)) > 0)
1388         {
1389           delete $proc{$died};
1390         }
1391       }
1392     }
1393     freeze();
1394     collate_output();
1395     cleanup();
1396     save_meta();
1397     exit 0;
1398   }
1399 }
1400
1401
1402 sub freeze
1403 {
1404   Log (undef, "Freeze not implemented");
1405   return;
1406 }
1407
1408
1409 sub thaw
1410 {
1411   croak ("Thaw not implemented");
1412 }
1413
1414
1415 sub freezequote
1416 {
1417   my $s = shift;
1418   $s =~ s/\\/\\\\/g;
1419   $s =~ s/\n/\\n/g;
1420   return $s;
1421 }
1422
1423
1424 sub freezeunquote
1425 {
1426   my $s = shift;
1427   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1428   return $s;
1429 }
1430
1431
1432 sub srun
1433 {
1434   my $srunargs = shift;
1435   my $execargs = shift;
1436   my $opts = shift || {};
1437   my $stdin = shift;
1438   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1439   print STDERR (join (" ",
1440                       map { / / ? "'$_'" : $_ }
1441                       (@$args)),
1442                 "\n")
1443       if $ENV{CRUNCH_DEBUG};
1444
1445   if (defined $stdin) {
1446     my $child = open STDIN, "-|";
1447     defined $child or die "no fork: $!";
1448     if ($child == 0) {
1449       print $stdin or die $!;
1450       close STDOUT or die $!;
1451       exit 0;
1452     }
1453   }
1454
1455   return system (@$args) if $opts->{fork};
1456
1457   exec @$args;
1458   warn "ENV size is ".length(join(" ",%ENV));
1459   die "exec failed: $!: @$args";
1460 }
1461
1462
1463 sub ban_node_by_slot {
1464   # Don't start any new jobsteps on this node for 60 seconds
1465   my $slotid = shift;
1466   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1467   $slot[$slotid]->{node}->{hold_count}++;
1468   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1469 }
1470
1471 sub must_lock_now
1472 {
1473   my ($lockfile, $error_message) = @_;
1474   open L, ">", $lockfile or croak("$lockfile: $!");
1475   if (!flock L, LOCK_EX|LOCK_NB) {
1476     croak("Can't lock $lockfile: $error_message\n");
1477   }
1478 }
1479
1480 sub find_docker_image {
1481   # Given a Keep locator, check to see if it contains a Docker image.
1482   # If so, return its stream name and Docker hash.
1483   # If not, return undef for both values.
1484   my $locator = shift;
1485   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1486     my @file_list = @{$image->{files}};
1487     if ((scalar(@file_list) == 1) &&
1488         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1489       return ($file_list[0][0], $1);
1490     }
1491   }
1492   return (undef, undef);
1493 }
1494
1495 __DATA__
1496 #!/usr/bin/perl
1497
1498 # checkout-and-build
1499
1500 use Fcntl ':flock';
1501 use File::Path qw( make_path );
1502
1503 my $destdir = $ENV{"CRUNCH_SRC"};
1504 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1505 my $repo = $ENV{"CRUNCH_SRC_URL"};
1506 my $task_work = $ENV{"TASK_WORK"};
1507
1508 for my $dir ($destdir, $task_work) {
1509     if ($dir) {
1510         make_path $dir;
1511         -e $dir or die "Failed to create temporary directory ($dir): $!";
1512     }
1513 }
1514
1515 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1516 flock L, LOCK_EX;
1517 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1518     exit 0;
1519 }
1520
1521 unlink "$destdir.commit";
1522 open STDOUT, ">", "$destdir.log";
1523 open STDERR, ">&STDOUT";
1524
1525 mkdir $destdir;
1526 my @git_archive_data = <DATA>;
1527 if (@git_archive_data) {
1528   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1529   print TARX @git_archive_data;
1530   if(!close(TARX)) {
1531     die "'tar -C $destdir -xf -' exited $?: $!";
1532   }
1533 }
1534
1535 my $pwd;
1536 chomp ($pwd = `pwd`);
1537 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1538 mkdir $install_dir;
1539
1540 for my $src_path ("$destdir/arvados/sdk/python") {
1541   if (-d $src_path) {
1542     shell_or_die ("virtualenv", $install_dir);
1543     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1544   }
1545 }
1546
1547 if (-e "$destdir/crunch_scripts/install") {
1548     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1549 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1550     # Old version
1551     shell_or_die ("./tests/autotests.sh", $install_dir);
1552 } elsif (-e "./install.sh") {
1553     shell_or_die ("./install.sh", $install_dir);
1554 }
1555
1556 if ($commit) {
1557     unlink "$destdir.commit.new";
1558     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1559     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1560 }
1561
1562 close L;
1563
1564 if (@ARGV) {
1565     exec(@ARGV);
1566 } else {
1567     exit 0;
1568 }
1569
1570 sub shell_or_die
1571 {
1572   if ($ENV{"DEBUG"}) {
1573     print STDERR "@_\n";
1574   }
1575   system (@_) == 0
1576       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1577 }
1578
1579 __DATA__