06b3da99a9929823bdc6b91ca4d315bb46878bc0
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90   if ($ENV{"USER"} ne "crunch" && $< != 0) {
91     # use a tmp dir unique for my uid
92     $ENV{"CRUNCH_TMP"} .= "-$<";
93   }
94 }
95
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
99 }
100
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
105
106 my $force_unlock;
107 my $git_dir;
108 my $jobspec;
109 my $job_api_token;
110 my $no_clear_tmp;
111 my $resume_stash;
112 GetOptions('force-unlock' => \$force_unlock,
113            'git-dir=s' => \$git_dir,
114            'job=s' => \$jobspec,
115            'job-api-token=s' => \$job_api_token,
116            'no-clear-tmp' => \$no_clear_tmp,
117            'resume-stash=s' => \$resume_stash,
118     );
119
120 if (defined $job_api_token) {
121   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
122 }
123
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
127
128
129 $SIG{'USR1'} = sub
130 {
131   $main::ENV{CRUNCH_DEBUG} = 1;
132 };
133 $SIG{'USR2'} = sub
134 {
135   $main::ENV{CRUNCH_DEBUG} = 0;
136 };
137
138
139
140 my $arv = Arvados->new('apiVersion' => 'v1');
141 my $local_logfile;
142
143 my $User = $arv->{'users'}->{'current'}->execute;
144
145 my $Job = {};
146 my $job_id;
147 my $dbh;
148 my $sth;
149 if ($job_has_uuid)
150 {
151   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152   if (!$force_unlock) {
153     if ($Job->{'is_locked_by_uuid'}) {
154       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
155     }
156     if ($Job->{'success'} ne undef) {
157       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
158     }
159     if ($Job->{'running'}) {
160       croak("Job 'running' flag is already set");
161     }
162     if ($Job->{'started_at'}) {
163       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
164     }
165   }
166 }
167 else
168 {
169   $Job = JSON::decode_json($jobspec);
170
171   if (!$resume_stash)
172   {
173     map { croak ("No $_ specified") unless $Job->{$_} }
174     qw(script script_version script_parameters);
175   }
176
177   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178   $Job->{'started_at'} = gmtime;
179
180   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
181
182   $job_has_uuid = 1;
183 }
184 $job_id = $Job->{'uuid'};
185
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
188
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
192
193
194 Log (undef, "check slurm allocation");
195 my @slot;
196 my @node;
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
198 my @sinfo;
199 if (!$have_slurm)
200 {
201   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202   push @sinfo, "$localcpus localhost";
203 }
204 if (exists $ENV{SLURM_NODELIST})
205 {
206   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
207 }
208 foreach (@sinfo)
209 {
210   my ($ncpus, $slurm_nodelist) = split;
211   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
212
213   my @nodelist;
214   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
215   {
216     my $nodelist = $1;
217     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
218     {
219       my $ranges = $1;
220       foreach (split (",", $ranges))
221       {
222         my ($a, $b);
223         if (/(\d+)-(\d+)/)
224         {
225           $a = $1;
226           $b = $2;
227         }
228         else
229         {
230           $a = $_;
231           $b = $_;
232         }
233         push @nodelist, map {
234           my $n = $nodelist;
235           $n =~ s/\[[-,\d]+\]/$_/;
236           $n;
237         } ($a..$b);
238       }
239     }
240     else
241     {
242       push @nodelist, $nodelist;
243     }
244   }
245   foreach my $nodename (@nodelist)
246   {
247     Log (undef, "node $nodename - $ncpus slots");
248     my $node = { name => $nodename,
249                  ncpus => $ncpus,
250                  losing_streak => 0,
251                  hold_until => 0 };
252     foreach my $cpu (1..$ncpus)
253     {
254       push @slot, { node => $node,
255                     cpu => $cpu };
256     }
257   }
258   push @node, @nodelist;
259 }
260
261
262
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
265
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
267
268
269
270 my $jobmanager_id;
271 if ($job_has_uuid)
272 {
273   # Claim this job, and make sure nobody else does
274   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
275           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
276     croak("Error while updating / locking job");
277   }
278   $Job->update_attributes('started_at' => scalar gmtime,
279                           'running' => 1,
280                           'success' => undef,
281                           'tasks_summary' => { 'failed' => 0,
282                                                'todo' => 1,
283                                                'running' => 0,
284                                                'done' => 0 });
285 }
286
287
288 Log (undef, "start");
289 $SIG{'INT'} = sub { $main::please_freeze = 1; };
290 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
291 $SIG{'TERM'} = \&croak;
292 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
293 $SIG{'ALRM'} = sub { $main::please_info = 1; };
294 $SIG{'CONT'} = sub { $main::please_continue = 1; };
295 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
296
297 $main::please_freeze = 0;
298 $main::please_info = 0;
299 $main::please_continue = 0;
300 $main::please_refresh = 0;
301 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
302
303 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
304 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
305 $ENV{"JOB_UUID"} = $job_id;
306
307
308 my @jobstep;
309 my @jobstep_todo = ();
310 my @jobstep_done = ();
311 my @jobstep_tomerge = ();
312 my $jobstep_tomerge_level = 0;
313 my $squeue_checked;
314 my $squeue_kill_checked;
315 my $output_in_keep = 0;
316 my $latest_refresh = scalar time;
317
318
319
320 if (defined $Job->{thawedfromkey})
321 {
322   thaw ($Job->{thawedfromkey});
323 }
324 else
325 {
326   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
327     'job_uuid' => $Job->{'uuid'},
328     'sequence' => 0,
329     'qsequence' => 0,
330     'parameters' => {},
331                                                           });
332   push @jobstep, { 'level' => 0,
333                    'failures' => 0,
334                    'arvados_task' => $first_task,
335                  };
336   push @jobstep_todo, 0;
337 }
338
339
340 if (!$have_slurm)
341 {
342   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
343 }
344
345
346 my $build_script;
347
348
349 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
350
351 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
352 if ($skip_install)
353 {
354   if (!defined $no_clear_tmp) {
355     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
356     system($clear_tmp_cmd) == 0
357         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
358   }
359   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
360   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
361     if (-d $src_path) {
362       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
363           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
364       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
365           == 0
366           or croak ("setup.py in $src_path failed: exit ".($?>>8));
367     }
368   }
369 }
370 else
371 {
372   do {
373     local $/ = undef;
374     $build_script = <DATA>;
375   };
376   Log (undef, "Install revision ".$Job->{script_version});
377   my $nodelist = join(",", @node);
378
379   if (!defined $no_clear_tmp) {
380     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
381
382     my $cleanpid = fork();
383     if ($cleanpid == 0)
384     {
385       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
386             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
387       exit (1);
388     }
389     while (1)
390     {
391       last if $cleanpid == waitpid (-1, WNOHANG);
392       freeze_if_want_freeze ($cleanpid);
393       select (undef, undef, undef, 0.1);
394     }
395     Log (undef, "Clean-work-dir exited $?");
396   }
397
398   # Install requested code version
399
400   my @execargs;
401   my @srunargs = ("srun",
402                   "--nodelist=$nodelist",
403                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
404
405   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
406   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
407
408   my $commit;
409   my $git_archive;
410   my $treeish = $Job->{'script_version'};
411
412   # If we're running under crunch-dispatch, it will have pulled the
413   # appropriate source tree into its own repository, and given us that
414   # repo's path as $git_dir. If we're running a "local" job, and a
415   # script_version was specified, it's up to the user to provide the
416   # full path to a local repository in Job->{repository}.
417   #
418   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
419   # git-archive --remote where appropriate.
420   #
421   # TODO: Accept a locally-hosted Arvados repository by name or
422   # UUID. Use arvados.v1.repositories.list or .get to figure out the
423   # appropriate fetch-url.
424   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
425
426   $ENV{"CRUNCH_SRC_URL"} = $repo;
427
428   if (-d "$repo/.git") {
429     # We were given a working directory, but we are only interested in
430     # the index.
431     $repo = "$repo/.git";
432   }
433
434   # If this looks like a subversion r#, look for it in git-svn commit messages
435
436   if ($treeish =~ m{^\d{1,4}$}) {
437     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
438     chomp $gitlog;
439     if ($gitlog =~ /^[a-f0-9]{40}$/) {
440       $commit = $gitlog;
441       Log (undef, "Using commit $commit for script_version $treeish");
442     }
443   }
444
445   # If that didn't work, try asking git to look it up as a tree-ish.
446
447   if (!defined $commit) {
448     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
449     chomp $found;
450     if ($found =~ /^[0-9a-f]{40}$/s) {
451       $commit = $found;
452       if ($commit ne $treeish) {
453         # Make sure we record the real commit id in the database,
454         # frozentokey, logs, etc. -- instead of an abbreviation or a
455         # branch name which can become ambiguous or point to a
456         # different commit in the future.
457         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458         Log (undef, "Using commit $commit for tree-ish $treeish");
459         if ($commit ne $treeish) {
460           $Job->{'script_version'} = $commit;
461           !$job_has_uuid or
462               $Job->update_attributes('script_version' => $commit) or
463               croak("Error while updating job");
464         }
465       }
466     }
467   }
468
469   if (defined $commit) {
470     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
471     @execargs = ("sh", "-c",
472                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
473     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
474   }
475   else {
476     croak ("could not figure out commit id for $treeish");
477   }
478
479   my $installpid = fork();
480   if ($installpid == 0)
481   {
482     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
483     exit (1);
484   }
485   while (1)
486   {
487     last if $installpid == waitpid (-1, WNOHANG);
488     freeze_if_want_freeze ($installpid);
489     select (undef, undef, undef, 0.1);
490   }
491   Log (undef, "Install exited $?");
492 }
493
494 if (!$have_slurm)
495 {
496   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
497   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
498 }
499
500 # If this job requires a Docker image, install that.
501 my $docker_bin = "/usr/bin/docker.io";
502 my ($docker_locator, $docker_stream, $docker_hash);
503 if ($docker_locator = $Job->{docker_image_locator}) {
504   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
505   if (!$docker_hash)
506   {
507     croak("No Docker image hash found from locator $docker_locator");
508   }
509   $docker_stream =~ s/^\.//;
510   my $docker_install_script = qq{
511 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
512     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
513 fi
514 };
515   my $docker_pid = fork();
516   if ($docker_pid == 0)
517   {
518     srun (["srun", "--nodelist=" . join(',', @node)],
519           ["/bin/sh", "-ec", $docker_install_script]);
520     exit ($?);
521   }
522   while (1)
523   {
524     last if $docker_pid == waitpid (-1, WNOHANG);
525     freeze_if_want_freeze ($docker_pid);
526     select (undef, undef, undef, 0.1);
527   }
528   if ($? != 0)
529   {
530     croak("Installing Docker image from $docker_locator returned exit code $?");
531   }
532 }
533
534 foreach (qw (script script_version script_parameters runtime_constraints))
535 {
536   Log (undef,
537        "$_ " .
538        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
539 }
540 foreach (split (/\n/, $Job->{knobs}))
541 {
542   Log (undef, "knob " . $_);
543 }
544
545
546
547 $main::success = undef;
548
549
550
551 ONELEVEL:
552
553 my $thisround_succeeded = 0;
554 my $thisround_failed = 0;
555 my $thisround_failed_multiple = 0;
556
557 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
558                        or $a <=> $b } @jobstep_todo;
559 my $level = $jobstep[$jobstep_todo[0]]->{level};
560 Log (undef, "start level $level");
561
562
563
564 my %proc;
565 my @freeslot = (0..$#slot);
566 my @holdslot;
567 my %reader;
568 my $progress_is_dirty = 1;
569 my $progress_stats_updated = 0;
570
571 update_progress_stats();
572
573
574
575 THISROUND:
576 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
577 {
578   my $id = $jobstep_todo[$todo_ptr];
579   my $Jobstep = $jobstep[$id];
580   if ($Jobstep->{level} != $level)
581   {
582     next;
583   }
584
585   pipe $reader{$id}, "writer" or croak ($!);
586   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
587   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
588
589   my $childslot = $freeslot[0];
590   my $childnode = $slot[$childslot]->{node};
591   my $childslotname = join (".",
592                             $slot[$childslot]->{node}->{name},
593                             $slot[$childslot]->{cpu});
594   my $childpid = fork();
595   if ($childpid == 0)
596   {
597     $SIG{'INT'} = 'DEFAULT';
598     $SIG{'QUIT'} = 'DEFAULT';
599     $SIG{'TERM'} = 'DEFAULT';
600
601     foreach (values (%reader))
602     {
603       close($_);
604     }
605     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
606     open(STDOUT,">&writer");
607     open(STDERR,">&writer");
608
609     undef $dbh;
610     undef $sth;
611
612     delete $ENV{"GNUPGHOME"};
613     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
614     $ENV{"TASK_QSEQUENCE"} = $id;
615     $ENV{"TASK_SEQUENCE"} = $level;
616     $ENV{"JOB_SCRIPT"} = $Job->{script};
617     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
618       $param =~ tr/a-z/A-Z/;
619       $ENV{"JOB_PARAMETER_$param"} = $value;
620     }
621     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
622     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
623     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
624     $ENV{"HOME"} = $ENV{"TASK_WORK"};
625     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
626     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
627     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
628     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
629
630     $ENV{"GZIP"} = "-n";
631
632     my @srunargs = (
633       "srun",
634       "--nodelist=".$childnode->{name},
635       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
636       "--job-name=$job_id.$id.$$",
637         );
638     my $build_script_to_send = "";
639     my $command =
640         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
641         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
642         ."&& cd $ENV{CRUNCH_TMP} ";
643     if ($build_script)
644     {
645       $build_script_to_send = $build_script;
646       $command .=
647           "&& perl -";
648     }
649     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
650     if ($docker_hash)
651     {
652       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
653       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
654       # Dynamically configure the container to use the host system as its
655       # DNS server.  Get the host's global addresses from the ip command,
656       # and turn them into docker --dns options using gawk.
657       $command .=
658           q{$(ip -o address show scope global |
659               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
660       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
661       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
662       $command .= "--env=\QHOME=/home/crunch\E ";
663       while (my ($env_key, $env_val) = each %ENV)
664       {
665         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
666           if ($env_key eq "TASK_WORK") {
667             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
668           }
669           elsif ($env_key eq "TASK_KEEPMOUNT") {
670             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
671           }
672           else {
673             $command .= "--env=\Q$env_key=$env_val\E ";
674           }
675         }
676       }
677       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
678       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
679       $command .= "\Q$docker_hash\E ";
680       $command .= "stdbuf --output=0 --error=0 ";
681       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
682     } else {
683       # Non-docker run
684       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
685       $command .= "stdbuf --output=0 --error=0 ";
686       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
687     }
688
689     my @execargs = ('bash', '-c', $command);
690     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
691     exit (111);
692   }
693   close("writer");
694   if (!defined $childpid)
695   {
696     close $reader{$id};
697     delete $reader{$id};
698     next;
699   }
700   shift @freeslot;
701   $proc{$childpid} = { jobstep => $id,
702                        time => time,
703                        slot => $childslot,
704                        jobstepname => "$job_id.$id.$childpid",
705                      };
706   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
707   $slot[$childslot]->{pid} = $childpid;
708
709   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
710   Log ($id, "child $childpid started on $childslotname");
711   $Jobstep->{starttime} = time;
712   $Jobstep->{node} = $childnode->{name};
713   $Jobstep->{slotindex} = $childslot;
714   delete $Jobstep->{stderr};
715   delete $Jobstep->{finishtime};
716
717   splice @jobstep_todo, $todo_ptr, 1;
718   --$todo_ptr;
719
720   $progress_is_dirty = 1;
721
722   while (!@freeslot
723          ||
724          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
725   {
726     last THISROUND if $main::please_freeze;
727     if ($main::please_info)
728     {
729       $main::please_info = 0;
730       freeze();
731       collate_output();
732       save_meta(1);
733       update_progress_stats();
734     }
735     my $gotsome
736         = readfrompipes ()
737         + reapchildren ();
738     if (!$gotsome)
739     {
740       check_refresh_wanted();
741       check_squeue();
742       update_progress_stats();
743       select (undef, undef, undef, 0.1);
744     }
745     elsif (time - $progress_stats_updated >= 30)
746     {
747       update_progress_stats();
748     }
749     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
750         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
751     {
752       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
753           .($thisround_failed+$thisround_succeeded)
754           .") -- giving up on this round";
755       Log (undef, $message);
756       last THISROUND;
757     }
758
759     # move slots from freeslot to holdslot (or back to freeslot) if necessary
760     for (my $i=$#freeslot; $i>=0; $i--) {
761       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
762         push @holdslot, (splice @freeslot, $i, 1);
763       }
764     }
765     for (my $i=$#holdslot; $i>=0; $i--) {
766       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
767         push @freeslot, (splice @holdslot, $i, 1);
768       }
769     }
770
771     # give up if no nodes are succeeding
772     if (!grep { $_->{node}->{losing_streak} == 0 &&
773                     $_->{node}->{hold_count} < 4 } @slot) {
774       my $message = "Every node has failed -- giving up on this round";
775       Log (undef, $message);
776       last THISROUND;
777     }
778   }
779 }
780
781
782 push @freeslot, splice @holdslot;
783 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
784
785
786 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
787 while (%proc)
788 {
789   if ($main::please_continue) {
790     $main::please_continue = 0;
791     goto THISROUND;
792   }
793   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
794   readfrompipes ();
795   if (!reapchildren())
796   {
797     check_refresh_wanted();
798     check_squeue();
799     update_progress_stats();
800     select (undef, undef, undef, 0.1);
801     killem (keys %proc) if $main::please_freeze;
802   }
803 }
804
805 update_progress_stats();
806 freeze_if_want_freeze();
807
808
809 if (!defined $main::success)
810 {
811   if (@jobstep_todo &&
812       $thisround_succeeded == 0 &&
813       ($thisround_failed == 0 || $thisround_failed > 4))
814   {
815     my $message = "stop because $thisround_failed tasks failed and none succeeded";
816     Log (undef, $message);
817     $main::success = 0;
818   }
819   if (!@jobstep_todo)
820   {
821     $main::success = 1;
822   }
823 }
824
825 goto ONELEVEL if !defined $main::success;
826
827
828 release_allocation();
829 freeze();
830 my $collated_output = &collate_output();
831
832 if ($job_has_uuid) {
833   $Job->update_attributes('running' => 0,
834                           'success' => $collated_output && $main::success,
835                           'finished_at' => scalar gmtime)
836 }
837
838 if (!$collated_output) {
839   Log(undef, "output undef");
840 }
841 else {
842   eval {
843     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
844         or die "failed to get collated manifest: $!";
845     # Read the original manifest, and strip permission hints from it,
846     # so we can put the result in a Collection.
847     my @stripped_manifest_lines = ();
848     my $orig_manifest_text = '';
849     while (my $manifest_line = <$orig_manifest>) {
850       $orig_manifest_text .= $manifest_line;
851       my @words = split(/ /, $manifest_line, -1);
852       foreach my $ii (0..$#words) {
853         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
854           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
855         }
856       }
857       push(@stripped_manifest_lines, join(" ", @words));
858     }
859     my $stripped_manifest_text = join("", @stripped_manifest_lines);
860     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
861       'uuid' => md5_hex($stripped_manifest_text),
862       'manifest_text' => $orig_manifest_text,
863     });
864     Log(undef, "output " . $output->{uuid});
865     $Job->update_attributes('output' => $output->{uuid}) if $job_has_uuid;
866     if ($Job->{'output_is_persistent'}) {
867       $arv->{'links'}->{'create'}->execute('link' => {
868         'tail_kind' => 'arvados#user',
869         'tail_uuid' => $User->{'uuid'},
870         'head_kind' => 'arvados#collection',
871         'head_uuid' => $Job->{'output'},
872         'link_class' => 'resources',
873         'name' => 'wants',
874       });
875     }
876   };
877   if ($@) {
878     Log (undef, "Failed to register output manifest: $@");
879   }
880 }
881
882 Log (undef, "finish");
883
884 save_meta();
885 exit 0;
886
887
888
889 sub update_progress_stats
890 {
891   $progress_stats_updated = time;
892   return if !$progress_is_dirty;
893   my ($todo, $done, $running) = (scalar @jobstep_todo,
894                                  scalar @jobstep_done,
895                                  scalar @slot - scalar @freeslot - scalar @holdslot);
896   $Job->{'tasks_summary'} ||= {};
897   $Job->{'tasks_summary'}->{'todo'} = $todo;
898   $Job->{'tasks_summary'}->{'done'} = $done;
899   $Job->{'tasks_summary'}->{'running'} = $running;
900   if ($job_has_uuid) {
901     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
902   }
903   Log (undef, "status: $done done, $running running, $todo todo");
904   $progress_is_dirty = 0;
905 }
906
907
908
909 sub reapchildren
910 {
911   my $pid = waitpid (-1, WNOHANG);
912   return 0 if $pid <= 0;
913
914   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
915                   . "."
916                   . $slot[$proc{$pid}->{slot}]->{cpu});
917   my $jobstepid = $proc{$pid}->{jobstep};
918   my $elapsed = time - $proc{$pid}->{time};
919   my $Jobstep = $jobstep[$jobstepid];
920
921   my $childstatus = $?;
922   my $exitvalue = $childstatus >> 8;
923   my $exitinfo = sprintf("exit %d signal %d%s",
924                          $exitvalue,
925                          $childstatus & 127,
926                          ($childstatus & 128 ? ' core dump' : ''));
927   $Jobstep->{'arvados_task'}->reload;
928   my $task_success = $Jobstep->{'arvados_task'}->{success};
929
930   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
931
932   if (!defined $task_success) {
933     # task did not indicate one way or the other --> fail
934     $Jobstep->{'arvados_task'}->{success} = 0;
935     $Jobstep->{'arvados_task'}->save;
936     $task_success = 0;
937   }
938
939   if (!$task_success)
940   {
941     my $temporary_fail;
942     $temporary_fail ||= $Jobstep->{node_fail};
943     $temporary_fail ||= ($exitvalue == 111);
944
945     ++$thisround_failed;
946     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
947
948     # Check for signs of a failed or misconfigured node
949     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
950         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
951       # Don't count this against jobstep failure thresholds if this
952       # node is already suspected faulty and srun exited quickly
953       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
954           $elapsed < 5) {
955         Log ($jobstepid, "blaming failure on suspect node " .
956              $slot[$proc{$pid}->{slot}]->{node}->{name});
957         $temporary_fail ||= 1;
958       }
959       ban_node_by_slot($proc{$pid}->{slot});
960     }
961
962     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
963                              ++$Jobstep->{'failures'},
964                              $temporary_fail ? 'temporary ' : 'permanent',
965                              $elapsed));
966
967     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
968       # Give up on this task, and the whole job
969       $main::success = 0;
970       $main::please_freeze = 1;
971     }
972     else {
973       # Put this task back on the todo queue
974       push @jobstep_todo, $jobstepid;
975     }
976     $Job->{'tasks_summary'}->{'failed'}++;
977   }
978   else
979   {
980     ++$thisround_succeeded;
981     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
982     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
983     push @jobstep_done, $jobstepid;
984     Log ($jobstepid, "success in $elapsed seconds");
985   }
986   $Jobstep->{exitcode} = $childstatus;
987   $Jobstep->{finishtime} = time;
988   process_stderr ($jobstepid, $task_success);
989   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
990
991   close $reader{$jobstepid};
992   delete $reader{$jobstepid};
993   delete $slot[$proc{$pid}->{slot}]->{pid};
994   push @freeslot, $proc{$pid}->{slot};
995   delete $proc{$pid};
996
997   if ($task_success) {
998     # Load new tasks
999     my $newtask_list = [];
1000     my $newtask_results;
1001     do {
1002       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1003         'where' => {
1004           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1005         },
1006         'order' => 'qsequence',
1007         'offset' => scalar(@$newtask_list),
1008       );
1009       push(@$newtask_list, @{$newtask_results->{items}});
1010     } while (@{$newtask_results->{items}});
1011     foreach my $arvados_task (@$newtask_list) {
1012       my $jobstep = {
1013         'level' => $arvados_task->{'sequence'},
1014         'failures' => 0,
1015         'arvados_task' => $arvados_task
1016       };
1017       push @jobstep, $jobstep;
1018       push @jobstep_todo, $#jobstep;
1019     }
1020   }
1021
1022   $progress_is_dirty = 1;
1023   1;
1024 }
1025
1026 sub check_refresh_wanted
1027 {
1028   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1029   if (@stat && $stat[9] > $latest_refresh) {
1030     $latest_refresh = scalar time;
1031     if ($job_has_uuid) {
1032       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1033       for my $attr ('cancelled_at',
1034                     'cancelled_by_user_uuid',
1035                     'cancelled_by_client_uuid') {
1036         $Job->{$attr} = $Job2->{$attr};
1037       }
1038       if ($Job->{'cancelled_at'}) {
1039         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1040              " by user " . $Job->{cancelled_by_user_uuid});
1041         $main::success = 0;
1042         $main::please_freeze = 1;
1043       }
1044     }
1045   }
1046 }
1047
1048 sub check_squeue
1049 {
1050   # return if the kill list was checked <4 seconds ago
1051   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1052   {
1053     return;
1054   }
1055   $squeue_kill_checked = time;
1056
1057   # use killem() on procs whose killtime is reached
1058   for (keys %proc)
1059   {
1060     if (exists $proc{$_}->{killtime}
1061         && $proc{$_}->{killtime} <= time)
1062     {
1063       killem ($_);
1064     }
1065   }
1066
1067   # return if the squeue was checked <60 seconds ago
1068   if (defined $squeue_checked && $squeue_checked > time - 60)
1069   {
1070     return;
1071   }
1072   $squeue_checked = time;
1073
1074   if (!$have_slurm)
1075   {
1076     # here is an opportunity to check for mysterious problems with local procs
1077     return;
1078   }
1079
1080   # get a list of steps still running
1081   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1082   chop @squeue;
1083   if ($squeue[-1] ne "ok")
1084   {
1085     return;
1086   }
1087   pop @squeue;
1088
1089   # which of my jobsteps are running, according to squeue?
1090   my %ok;
1091   foreach (@squeue)
1092   {
1093     if (/^(\d+)\.(\d+) (\S+)/)
1094     {
1095       if ($1 eq $ENV{SLURM_JOBID})
1096       {
1097         $ok{$3} = 1;
1098       }
1099     }
1100   }
1101
1102   # which of my active child procs (>60s old) were not mentioned by squeue?
1103   foreach (keys %proc)
1104   {
1105     if ($proc{$_}->{time} < time - 60
1106         && !exists $ok{$proc{$_}->{jobstepname}}
1107         && !exists $proc{$_}->{killtime})
1108     {
1109       # kill this proc if it hasn't exited in 30 seconds
1110       $proc{$_}->{killtime} = time + 30;
1111     }
1112   }
1113 }
1114
1115
1116 sub release_allocation
1117 {
1118   if ($have_slurm)
1119   {
1120     Log (undef, "release job allocation");
1121     system "scancel $ENV{SLURM_JOBID}";
1122   }
1123 }
1124
1125
1126 sub readfrompipes
1127 {
1128   my $gotsome = 0;
1129   foreach my $job (keys %reader)
1130   {
1131     my $buf;
1132     while (0 < sysread ($reader{$job}, $buf, 8192))
1133     {
1134       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1135       $jobstep[$job]->{stderr} .= $buf;
1136       preprocess_stderr ($job);
1137       if (length ($jobstep[$job]->{stderr}) > 16384)
1138       {
1139         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1140       }
1141       $gotsome = 1;
1142     }
1143   }
1144   return $gotsome;
1145 }
1146
1147
1148 sub preprocess_stderr
1149 {
1150   my $job = shift;
1151
1152   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1153     my $line = $1;
1154     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1155     Log ($job, "stderr $line");
1156     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1157       # whoa.
1158       $main::please_freeze = 1;
1159     }
1160     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1161       $jobstep[$job]->{node_fail} = 1;
1162       ban_node_by_slot($jobstep[$job]->{slotindex});
1163     }
1164   }
1165 }
1166
1167
1168 sub process_stderr
1169 {
1170   my $job = shift;
1171   my $task_success = shift;
1172   preprocess_stderr ($job);
1173
1174   map {
1175     Log ($job, "stderr $_");
1176   } split ("\n", $jobstep[$job]->{stderr});
1177 }
1178
1179 sub fetch_block
1180 {
1181   my $hash = shift;
1182   my ($keep, $child_out, $output_block);
1183
1184   my $cmd = "arv-get \Q$hash\E";
1185   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1186   $output_block = '';
1187   while (1) {
1188     my $buf;
1189     my $bytes = sysread($keep, $buf, 1024 * 1024);
1190     if (!defined $bytes) {
1191       die "reading from arv-get: $!";
1192     } elsif ($bytes == 0) {
1193       # sysread returns 0 at the end of the pipe.
1194       last;
1195     } else {
1196       # some bytes were read into buf.
1197       $output_block .= $buf;
1198     }
1199   }
1200   close $keep;
1201   return $output_block;
1202 }
1203
1204 sub collate_output
1205 {
1206   Log (undef, "collate");
1207
1208   my ($child_out, $child_in);
1209   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1210   my $joboutput;
1211   for (@jobstep)
1212   {
1213     next if (!exists $_->{'arvados_task'}->{output} ||
1214              !$_->{'arvados_task'}->{'success'} ||
1215              $_->{'exitcode'} != 0);
1216     my $output = $_->{'arvados_task'}->{output};
1217     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1218     {
1219       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1220       print $child_in $output;
1221     }
1222     elsif (@jobstep == 1)
1223     {
1224       $joboutput = $output;
1225       last;
1226     }
1227     elsif (defined (my $outblock = fetch_block ($output)))
1228     {
1229       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1230       print $child_in $outblock;
1231     }
1232     else
1233     {
1234       Log (undef, "XXX fetch_block($output) failed XXX");
1235       $main::success = 0;
1236     }
1237   }
1238   $child_in->close;
1239
1240   if (!defined $joboutput) {
1241     my $s = IO::Select->new($child_out);
1242     if ($s->can_read(120)) {
1243       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1244       chomp($joboutput);
1245     } else {
1246       Log (undef, "timed out reading from 'arv-put'");
1247     }
1248   }
1249   waitpid($pid, 0);
1250
1251   return $joboutput;
1252 }
1253
1254
1255 sub killem
1256 {
1257   foreach (@_)
1258   {
1259     my $sig = 2;                # SIGINT first
1260     if (exists $proc{$_}->{"sent_$sig"} &&
1261         time - $proc{$_}->{"sent_$sig"} > 4)
1262     {
1263       $sig = 15;                # SIGTERM if SIGINT doesn't work
1264     }
1265     if (exists $proc{$_}->{"sent_$sig"} &&
1266         time - $proc{$_}->{"sent_$sig"} > 4)
1267     {
1268       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1269     }
1270     if (!exists $proc{$_}->{"sent_$sig"})
1271     {
1272       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1273       kill $sig, $_;
1274       select (undef, undef, undef, 0.1);
1275       if ($sig == 2)
1276       {
1277         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1278       }
1279       $proc{$_}->{"sent_$sig"} = time;
1280       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1281     }
1282   }
1283 }
1284
1285
1286 sub fhbits
1287 {
1288   my($bits);
1289   for (@_) {
1290     vec($bits,fileno($_),1) = 1;
1291   }
1292   $bits;
1293 }
1294
1295
1296 sub Log                         # ($jobstep_id, $logmessage)
1297 {
1298   if ($_[1] =~ /\n/) {
1299     for my $line (split (/\n/, $_[1])) {
1300       Log ($_[0], $line);
1301     }
1302     return;
1303   }
1304   my $fh = select STDERR; $|=1; select $fh;
1305   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1306   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1307   $message .= "\n";
1308   my $datetime;
1309   if ($local_logfile || -t STDERR) {
1310     my @gmtime = gmtime;
1311     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1312                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1313   }
1314   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1315
1316   if ($local_logfile) {
1317     print $local_logfile $datetime . " " . $message;
1318   }
1319 }
1320
1321
1322 sub croak
1323 {
1324   my ($package, $file, $line) = caller;
1325   my $message = "@_ at $file line $line\n";
1326   Log (undef, $message);
1327   freeze() if @jobstep_todo;
1328   collate_output() if @jobstep_todo;
1329   cleanup();
1330   save_meta() if $local_logfile;
1331   die;
1332 }
1333
1334
1335 sub cleanup
1336 {
1337   return if !$job_has_uuid;
1338   $Job->update_attributes('running' => 0,
1339                           'success' => 0,
1340                           'finished_at' => scalar gmtime);
1341 }
1342
1343
1344 sub save_meta
1345 {
1346   my $justcheckpoint = shift; # false if this will be the last meta saved
1347   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1348
1349   $local_logfile->flush;
1350   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1351       . quotemeta($local_logfile->filename);
1352   my $loglocator = `$cmd`;
1353   die "system $cmd failed: $?" if $?;
1354   chomp($loglocator);
1355
1356   $local_logfile = undef;   # the temp file is automatically deleted
1357   Log (undef, "log manifest is $loglocator");
1358   $Job->{'log'} = $loglocator;
1359   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1360 }
1361
1362
1363 sub freeze_if_want_freeze
1364 {
1365   if ($main::please_freeze)
1366   {
1367     release_allocation();
1368     if (@_)
1369     {
1370       # kill some srun procs before freeze+stop
1371       map { $proc{$_} = {} } @_;
1372       while (%proc)
1373       {
1374         killem (keys %proc);
1375         select (undef, undef, undef, 0.1);
1376         my $died;
1377         while (($died = waitpid (-1, WNOHANG)) > 0)
1378         {
1379           delete $proc{$died};
1380         }
1381       }
1382     }
1383     freeze();
1384     collate_output();
1385     cleanup();
1386     save_meta();
1387     exit 0;
1388   }
1389 }
1390
1391
1392 sub freeze
1393 {
1394   Log (undef, "Freeze not implemented");
1395   return;
1396 }
1397
1398
1399 sub thaw
1400 {
1401   croak ("Thaw not implemented");
1402 }
1403
1404
1405 sub freezequote
1406 {
1407   my $s = shift;
1408   $s =~ s/\\/\\\\/g;
1409   $s =~ s/\n/\\n/g;
1410   return $s;
1411 }
1412
1413
1414 sub freezeunquote
1415 {
1416   my $s = shift;
1417   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1418   return $s;
1419 }
1420
1421
1422 sub srun
1423 {
1424   my $srunargs = shift;
1425   my $execargs = shift;
1426   my $opts = shift || {};
1427   my $stdin = shift;
1428   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1429   print STDERR (join (" ",
1430                       map { / / ? "'$_'" : $_ }
1431                       (@$args)),
1432                 "\n")
1433       if $ENV{CRUNCH_DEBUG};
1434
1435   if (defined $stdin) {
1436     my $child = open STDIN, "-|";
1437     defined $child or die "no fork: $!";
1438     if ($child == 0) {
1439       print $stdin or die $!;
1440       close STDOUT or die $!;
1441       exit 0;
1442     }
1443   }
1444
1445   return system (@$args) if $opts->{fork};
1446
1447   exec @$args;
1448   warn "ENV size is ".length(join(" ",%ENV));
1449   die "exec failed: $!: @$args";
1450 }
1451
1452
1453 sub ban_node_by_slot {
1454   # Don't start any new jobsteps on this node for 60 seconds
1455   my $slotid = shift;
1456   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1457   $slot[$slotid]->{node}->{hold_count}++;
1458   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1459 }
1460
1461 sub must_lock_now
1462 {
1463   my ($lockfile, $error_message) = @_;
1464   open L, ">", $lockfile or croak("$lockfile: $!");
1465   if (!flock L, LOCK_EX|LOCK_NB) {
1466     croak("Can't lock $lockfile: $error_message\n");
1467   }
1468 }
1469
1470 sub find_docker_image {
1471   # Given a Keep locator, check to see if it contains a Docker image.
1472   # If so, return its stream name and Docker hash.
1473   # If not, return undef for both values.
1474   my $locator = shift;
1475   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1476     my @file_list = @{$image->{files}};
1477     if ((scalar(@file_list) == 1) &&
1478         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1479       return ($file_list[0][0], $1);
1480     }
1481   }
1482   return (undef, undef);
1483 }
1484
1485 __DATA__
1486 #!/usr/bin/perl
1487
1488 # checkout-and-build
1489
1490 use Fcntl ':flock';
1491
1492 my $destdir = $ENV{"CRUNCH_SRC"};
1493 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1494 my $repo = $ENV{"CRUNCH_SRC_URL"};
1495
1496 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1497 flock L, LOCK_EX;
1498 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1499     exit 0;
1500 }
1501
1502 unlink "$destdir.commit";
1503 open STDOUT, ">", "$destdir.log";
1504 open STDERR, ">&STDOUT";
1505
1506 mkdir $destdir;
1507 my @git_archive_data = <DATA>;
1508 if (@git_archive_data) {
1509   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1510   print TARX @git_archive_data;
1511   if(!close(TARX)) {
1512     die "'tar -C $destdir -xf -' exited $?: $!";
1513   }
1514 }
1515
1516 my $pwd;
1517 chomp ($pwd = `pwd`);
1518 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1519 mkdir $install_dir;
1520
1521 for my $src_path ("$destdir/arvados/sdk/python") {
1522   if (-d $src_path) {
1523     shell_or_die ("virtualenv", $install_dir);
1524     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1525   }
1526 }
1527
1528 if (-e "$destdir/crunch_scripts/install") {
1529     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1530 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1531     # Old version
1532     shell_or_die ("./tests/autotests.sh", $install_dir);
1533 } elsif (-e "./install.sh") {
1534     shell_or_die ("./install.sh", $install_dir);
1535 }
1536
1537 if ($commit) {
1538     unlink "$destdir.commit.new";
1539     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1540     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1541 }
1542
1543 close L;
1544
1545 exit 0;
1546
1547 sub shell_or_die
1548 {
1549   if ($ENV{"DEBUG"}) {
1550     print STDERR "@_\n";
1551   }
1552   system (@_) == 0
1553       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1554 }
1555
1556 __DATA__