Merge branch 'master' into 3193-manage-account
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 $ENV{"TMPDIR"} ||= "/tmp";
88 unless (defined $ENV{"CRUNCH_TMP"}) {
89   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
90   if ($ENV{"USER"} ne "crunch" && $< != 0) {
91     # use a tmp dir unique for my uid
92     $ENV{"CRUNCH_TMP"} .= "-$<";
93   }
94 }
95
96 # Create the tmp directory if it does not exist
97 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
98   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
99 }
100
101 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
102 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
103 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
104 mkdir ($ENV{"JOB_WORK"});
105
106 my $force_unlock;
107 my $git_dir;
108 my $jobspec;
109 my $job_api_token;
110 my $no_clear_tmp;
111 my $resume_stash;
112 GetOptions('force-unlock' => \$force_unlock,
113            'git-dir=s' => \$git_dir,
114            'job=s' => \$jobspec,
115            'job-api-token=s' => \$job_api_token,
116            'no-clear-tmp' => \$no_clear_tmp,
117            'resume-stash=s' => \$resume_stash,
118     );
119
120 if (defined $job_api_token) {
121   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
122 }
123
124 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
125 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
126 my $local_job = !$job_has_uuid;
127
128
129 $SIG{'USR1'} = sub
130 {
131   $main::ENV{CRUNCH_DEBUG} = 1;
132 };
133 $SIG{'USR2'} = sub
134 {
135   $main::ENV{CRUNCH_DEBUG} = 0;
136 };
137
138
139
140 my $arv = Arvados->new('apiVersion' => 'v1');
141 my $local_logfile;
142
143 my $User = $arv->{'users'}->{'current'}->execute;
144
145 my $Job = {};
146 my $job_id;
147 my $dbh;
148 my $sth;
149 if ($job_has_uuid)
150 {
151   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
152   if (!$force_unlock) {
153     if ($Job->{'is_locked_by_uuid'}) {
154       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
155     }
156     if ($Job->{'success'} ne undef) {
157       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
158     }
159     if ($Job->{'running'}) {
160       croak("Job 'running' flag is already set");
161     }
162     if ($Job->{'started_at'}) {
163       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
164     }
165   }
166 }
167 else
168 {
169   $Job = JSON::decode_json($jobspec);
170
171   if (!$resume_stash)
172   {
173     map { croak ("No $_ specified") unless $Job->{$_} }
174     qw(script script_version script_parameters);
175   }
176
177   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
178   $Job->{'started_at'} = gmtime;
179
180   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
181
182   $job_has_uuid = 1;
183 }
184 $job_id = $Job->{'uuid'};
185
186 my $keep_logfile = $job_id . '.log.txt';
187 $local_logfile = File::Temp->new();
188
189 $Job->{'runtime_constraints'} ||= {};
190 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
191 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
192
193
194 Log (undef, "check slurm allocation");
195 my @slot;
196 my @node;
197 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
198 my @sinfo;
199 if (!$have_slurm)
200 {
201   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
202   push @sinfo, "$localcpus localhost";
203 }
204 if (exists $ENV{SLURM_NODELIST})
205 {
206   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
207 }
208 foreach (@sinfo)
209 {
210   my ($ncpus, $slurm_nodelist) = split;
211   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
212
213   my @nodelist;
214   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
215   {
216     my $nodelist = $1;
217     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
218     {
219       my $ranges = $1;
220       foreach (split (",", $ranges))
221       {
222         my ($a, $b);
223         if (/(\d+)-(\d+)/)
224         {
225           $a = $1;
226           $b = $2;
227         }
228         else
229         {
230           $a = $_;
231           $b = $_;
232         }
233         push @nodelist, map {
234           my $n = $nodelist;
235           $n =~ s/\[[-,\d]+\]/$_/;
236           $n;
237         } ($a..$b);
238       }
239     }
240     else
241     {
242       push @nodelist, $nodelist;
243     }
244   }
245   foreach my $nodename (@nodelist)
246   {
247     Log (undef, "node $nodename - $ncpus slots");
248     my $node = { name => $nodename,
249                  ncpus => $ncpus,
250                  losing_streak => 0,
251                  hold_until => 0 };
252     foreach my $cpu (1..$ncpus)
253     {
254       push @slot, { node => $node,
255                     cpu => $cpu };
256     }
257   }
258   push @node, @nodelist;
259 }
260
261
262
263 # Ensure that we get one jobstep running on each allocated node before
264 # we start overloading nodes with concurrent steps
265
266 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
267
268
269
270 my $jobmanager_id;
271 if ($job_has_uuid)
272 {
273   # Claim this job, and make sure nobody else does
274   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
275           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
276     croak("Error while updating / locking job");
277   }
278   $Job->update_attributes('started_at' => scalar gmtime,
279                           'running' => 1,
280                           'success' => undef,
281                           'tasks_summary' => { 'failed' => 0,
282                                                'todo' => 1,
283                                                'running' => 0,
284                                                'done' => 0 });
285 }
286
287
288 Log (undef, "start");
289 $SIG{'INT'} = sub { $main::please_freeze = 1; };
290 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
291 $SIG{'TERM'} = \&croak;
292 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
293 $SIG{'ALRM'} = sub { $main::please_info = 1; };
294 $SIG{'CONT'} = sub { $main::please_continue = 1; };
295 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
296
297 $main::please_freeze = 0;
298 $main::please_info = 0;
299 $main::please_continue = 0;
300 $main::please_refresh = 0;
301 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
302
303 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
304 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
305 $ENV{"JOB_UUID"} = $job_id;
306
307
308 my @jobstep;
309 my @jobstep_todo = ();
310 my @jobstep_done = ();
311 my @jobstep_tomerge = ();
312 my $jobstep_tomerge_level = 0;
313 my $squeue_checked;
314 my $squeue_kill_checked;
315 my $output_in_keep = 0;
316 my $latest_refresh = scalar time;
317
318
319
320 if (defined $Job->{thawedfromkey})
321 {
322   thaw ($Job->{thawedfromkey});
323 }
324 else
325 {
326   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
327     'job_uuid' => $Job->{'uuid'},
328     'sequence' => 0,
329     'qsequence' => 0,
330     'parameters' => {},
331                                                           });
332   push @jobstep, { 'level' => 0,
333                    'failures' => 0,
334                    'arvados_task' => $first_task,
335                  };
336   push @jobstep_todo, 0;
337 }
338
339
340 if (!$have_slurm)
341 {
342   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
343 }
344
345
346 my $build_script;
347
348
349 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
350
351 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
352 if ($skip_install)
353 {
354   if (!defined $no_clear_tmp) {
355     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
356     system($clear_tmp_cmd) == 0
357         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
358   }
359   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
360   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
361     if (-d $src_path) {
362       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
363           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
364       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
365           == 0
366           or croak ("setup.py in $src_path failed: exit ".($?>>8));
367     }
368   }
369 }
370 else
371 {
372   do {
373     local $/ = undef;
374     $build_script = <DATA>;
375   };
376   Log (undef, "Install revision ".$Job->{script_version});
377   my $nodelist = join(",", @node);
378
379   if (!defined $no_clear_tmp) {
380     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
381
382     my $cleanpid = fork();
383     if ($cleanpid == 0)
384     {
385       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
386             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
387       exit (1);
388     }
389     while (1)
390     {
391       last if $cleanpid == waitpid (-1, WNOHANG);
392       freeze_if_want_freeze ($cleanpid);
393       select (undef, undef, undef, 0.1);
394     }
395     Log (undef, "Clean-work-dir exited $?");
396   }
397
398   # Install requested code version
399
400   my @execargs;
401   my @srunargs = ("srun",
402                   "--nodelist=$nodelist",
403                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
404
405   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
406   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
407
408   my $commit;
409   my $git_archive;
410   my $treeish = $Job->{'script_version'};
411
412   # If we're running under crunch-dispatch, it will have pulled the
413   # appropriate source tree into its own repository, and given us that
414   # repo's path as $git_dir. If we're running a "local" job, and a
415   # script_version was specified, it's up to the user to provide the
416   # full path to a local repository in Job->{repository}.
417   #
418   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
419   # git-archive --remote where appropriate.
420   #
421   # TODO: Accept a locally-hosted Arvados repository by name or
422   # UUID. Use arvados.v1.repositories.list or .get to figure out the
423   # appropriate fetch-url.
424   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
425
426   $ENV{"CRUNCH_SRC_URL"} = $repo;
427
428   if (-d "$repo/.git") {
429     # We were given a working directory, but we are only interested in
430     # the index.
431     $repo = "$repo/.git";
432   }
433
434   # If this looks like a subversion r#, look for it in git-svn commit messages
435
436   if ($treeish =~ m{^\d{1,4}$}) {
437     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
438     chomp $gitlog;
439     if ($gitlog =~ /^[a-f0-9]{40}$/) {
440       $commit = $gitlog;
441       Log (undef, "Using commit $commit for script_version $treeish");
442     }
443   }
444
445   # If that didn't work, try asking git to look it up as a tree-ish.
446
447   if (!defined $commit) {
448     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
449     chomp $found;
450     if ($found =~ /^[0-9a-f]{40}$/s) {
451       $commit = $found;
452       if ($commit ne $treeish) {
453         # Make sure we record the real commit id in the database,
454         # frozentokey, logs, etc. -- instead of an abbreviation or a
455         # branch name which can become ambiguous or point to a
456         # different commit in the future.
457         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
458         Log (undef, "Using commit $commit for tree-ish $treeish");
459         if ($commit ne $treeish) {
460           $Job->{'script_version'} = $commit;
461           !$job_has_uuid or
462               $Job->update_attributes('script_version' => $commit) or
463               croak("Error while updating job");
464         }
465       }
466     }
467   }
468
469   if (defined $commit) {
470     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
471     @execargs = ("sh", "-c",
472                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
473     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
474   }
475   else {
476     croak ("could not figure out commit id for $treeish");
477   }
478
479   my $installpid = fork();
480   if ($installpid == 0)
481   {
482     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
483     exit (1);
484   }
485   while (1)
486   {
487     last if $installpid == waitpid (-1, WNOHANG);
488     freeze_if_want_freeze ($installpid);
489     select (undef, undef, undef, 0.1);
490   }
491   Log (undef, "Install exited $?");
492 }
493
494 if (!$have_slurm)
495 {
496   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
497   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
498 }
499
500 # If this job requires a Docker image, install that.
501 my $docker_bin = "/usr/bin/docker.io";
502 my ($docker_locator, $docker_hash);
503 if ($docker_locator = $Job->{docker_image_locator}) {
504   $docker_hash = find_docker_hash($docker_locator);
505   if (!$docker_hash)
506   {
507     croak("No Docker image hash found from locator $docker_locator");
508   }
509   my $docker_install_script = qq{
510 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
511     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
512 fi
513 };
514   my $docker_pid = fork();
515   if ($docker_pid == 0)
516   {
517     srun (["srun", "--nodelist=" . join(',', @node)],
518           ["/bin/sh", "-ec", $docker_install_script]);
519     exit ($?);
520   }
521   while (1)
522   {
523     last if $docker_pid == waitpid (-1, WNOHANG);
524     freeze_if_want_freeze ($docker_pid);
525     select (undef, undef, undef, 0.1);
526   }
527   if ($? != 0)
528   {
529     croak("Installing Docker image from $docker_locator returned exit code $?");
530   }
531 }
532
533 foreach (qw (script script_version script_parameters runtime_constraints))
534 {
535   Log (undef,
536        "$_ " .
537        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
538 }
539 foreach (split (/\n/, $Job->{knobs}))
540 {
541   Log (undef, "knob " . $_);
542 }
543
544
545
546 $main::success = undef;
547
548
549
550 ONELEVEL:
551
552 my $thisround_succeeded = 0;
553 my $thisround_failed = 0;
554 my $thisround_failed_multiple = 0;
555
556 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
557                        or $a <=> $b } @jobstep_todo;
558 my $level = $jobstep[$jobstep_todo[0]]->{level};
559 Log (undef, "start level $level");
560
561
562
563 my %proc;
564 my @freeslot = (0..$#slot);
565 my @holdslot;
566 my %reader;
567 my $progress_is_dirty = 1;
568 my $progress_stats_updated = 0;
569
570 update_progress_stats();
571
572
573
574 THISROUND:
575 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
576 {
577   my $id = $jobstep_todo[$todo_ptr];
578   my $Jobstep = $jobstep[$id];
579   if ($Jobstep->{level} != $level)
580   {
581     next;
582   }
583
584   pipe $reader{$id}, "writer" or croak ($!);
585   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
586   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
587
588   my $childslot = $freeslot[0];
589   my $childnode = $slot[$childslot]->{node};
590   my $childslotname = join (".",
591                             $slot[$childslot]->{node}->{name},
592                             $slot[$childslot]->{cpu});
593   my $childpid = fork();
594   if ($childpid == 0)
595   {
596     $SIG{'INT'} = 'DEFAULT';
597     $SIG{'QUIT'} = 'DEFAULT';
598     $SIG{'TERM'} = 'DEFAULT';
599
600     foreach (values (%reader))
601     {
602       close($_);
603     }
604     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
605     open(STDOUT,">&writer");
606     open(STDERR,">&writer");
607
608     undef $dbh;
609     undef $sth;
610
611     delete $ENV{"GNUPGHOME"};
612     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
613     $ENV{"TASK_QSEQUENCE"} = $id;
614     $ENV{"TASK_SEQUENCE"} = $level;
615     $ENV{"JOB_SCRIPT"} = $Job->{script};
616     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
617       $param =~ tr/a-z/A-Z/;
618       $ENV{"JOB_PARAMETER_$param"} = $value;
619     }
620     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
621     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
622     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
623     $ENV{"HOME"} = $ENV{"TASK_WORK"};
624     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
625     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
626     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
627     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
628
629     $ENV{"GZIP"} = "-n";
630
631     my @srunargs = (
632       "srun",
633       "--nodelist=".$childnode->{name},
634       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
635       "--job-name=$job_id.$id.$$",
636         );
637     my $build_script_to_send = "";
638     my $command =
639         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
640         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
641         ."&& cd $ENV{CRUNCH_TMP} ";
642     if ($build_script)
643     {
644       $build_script_to_send = $build_script;
645       $command .=
646           "&& perl -";
647     }
648     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
649     if ($docker_hash)
650     {
651       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
652       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
653       # Dynamically configure the container to use the host system as its
654       # DNS server.  Get the host's global addresses from the ip command,
655       # and turn them into docker --dns options using gawk.
656       $command .=
657           q{$(ip -o address show scope global |
658               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
659       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
660       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
661       $command .= "--env=\QHOME=/home/crunch\E ";
662       while (my ($env_key, $env_val) = each %ENV)
663       {
664         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
665           if ($env_key eq "TASK_WORK") {
666             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
667           }
668           elsif ($env_key eq "TASK_KEEPMOUNT") {
669             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
670           }
671           else {
672             $command .= "--env=\Q$env_key=$env_val\E ";
673           }
674         }
675       }
676       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
677       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
678       $command .= "\Q$docker_hash\E ";
679       $command .= "stdbuf --output=0 --error=0 ";
680       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
681     } else {
682       # Non-docker run
683       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
684       $command .= "stdbuf --output=0 --error=0 ";
685       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
686     }
687
688     my @execargs = ('bash', '-c', $command);
689     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
690     exit (111);
691   }
692   close("writer");
693   if (!defined $childpid)
694   {
695     close $reader{$id};
696     delete $reader{$id};
697     next;
698   }
699   shift @freeslot;
700   $proc{$childpid} = { jobstep => $id,
701                        time => time,
702                        slot => $childslot,
703                        jobstepname => "$job_id.$id.$childpid",
704                      };
705   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
706   $slot[$childslot]->{pid} = $childpid;
707
708   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
709   Log ($id, "child $childpid started on $childslotname");
710   $Jobstep->{starttime} = time;
711   $Jobstep->{node} = $childnode->{name};
712   $Jobstep->{slotindex} = $childslot;
713   delete $Jobstep->{stderr};
714   delete $Jobstep->{finishtime};
715
716   splice @jobstep_todo, $todo_ptr, 1;
717   --$todo_ptr;
718
719   $progress_is_dirty = 1;
720
721   while (!@freeslot
722          ||
723          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
724   {
725     last THISROUND if $main::please_freeze;
726     if ($main::please_info)
727     {
728       $main::please_info = 0;
729       freeze();
730       collate_output();
731       save_meta(1);
732       update_progress_stats();
733     }
734     my $gotsome
735         = readfrompipes ()
736         + reapchildren ();
737     if (!$gotsome)
738     {
739       check_refresh_wanted();
740       check_squeue();
741       update_progress_stats();
742       select (undef, undef, undef, 0.1);
743     }
744     elsif (time - $progress_stats_updated >= 30)
745     {
746       update_progress_stats();
747     }
748     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
749         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
750     {
751       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
752           .($thisround_failed+$thisround_succeeded)
753           .") -- giving up on this round";
754       Log (undef, $message);
755       last THISROUND;
756     }
757
758     # move slots from freeslot to holdslot (or back to freeslot) if necessary
759     for (my $i=$#freeslot; $i>=0; $i--) {
760       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
761         push @holdslot, (splice @freeslot, $i, 1);
762       }
763     }
764     for (my $i=$#holdslot; $i>=0; $i--) {
765       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
766         push @freeslot, (splice @holdslot, $i, 1);
767       }
768     }
769
770     # give up if no nodes are succeeding
771     if (!grep { $_->{node}->{losing_streak} == 0 &&
772                     $_->{node}->{hold_count} < 4 } @slot) {
773       my $message = "Every node has failed -- giving up on this round";
774       Log (undef, $message);
775       last THISROUND;
776     }
777   }
778 }
779
780
781 push @freeslot, splice @holdslot;
782 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
783
784
785 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
786 while (%proc)
787 {
788   if ($main::please_continue) {
789     $main::please_continue = 0;
790     goto THISROUND;
791   }
792   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
793   readfrompipes ();
794   if (!reapchildren())
795   {
796     check_refresh_wanted();
797     check_squeue();
798     update_progress_stats();
799     select (undef, undef, undef, 0.1);
800     killem (keys %proc) if $main::please_freeze;
801   }
802 }
803
804 update_progress_stats();
805 freeze_if_want_freeze();
806
807
808 if (!defined $main::success)
809 {
810   if (@jobstep_todo &&
811       $thisround_succeeded == 0 &&
812       ($thisround_failed == 0 || $thisround_failed > 4))
813   {
814     my $message = "stop because $thisround_failed tasks failed and none succeeded";
815     Log (undef, $message);
816     $main::success = 0;
817   }
818   if (!@jobstep_todo)
819   {
820     $main::success = 1;
821   }
822 }
823
824 goto ONELEVEL if !defined $main::success;
825
826
827 release_allocation();
828 freeze();
829 my $collated_output = &collate_output();
830
831 if ($job_has_uuid) {
832   $Job->update_attributes('running' => 0,
833                           'success' => $collated_output && $main::success,
834                           'finished_at' => scalar gmtime)
835 }
836
837 if ($collated_output)
838 {
839   eval {
840     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
841         or die "failed to get collated manifest: $!";
842     # Read the original manifest, and strip permission hints from it,
843     # so we can put the result in a Collection.
844     my @stripped_manifest_lines = ();
845     my $orig_manifest_text = '';
846     while (my $manifest_line = <$orig_manifest>) {
847       $orig_manifest_text .= $manifest_line;
848       my @words = split(/ /, $manifest_line, -1);
849       foreach my $ii (0..$#words) {
850         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
851           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
852         }
853       }
854       push(@stripped_manifest_lines, join(" ", @words));
855     }
856     my $stripped_manifest_text = join("", @stripped_manifest_lines);
857     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
858       'uuid' => md5_hex($stripped_manifest_text),
859       'manifest_text' => $orig_manifest_text,
860     });
861     $Job->update_attributes('output' => $output->{uuid});
862     if ($Job->{'output_is_persistent'}) {
863       $arv->{'links'}->{'create'}->execute('link' => {
864         'tail_kind' => 'arvados#user',
865         'tail_uuid' => $User->{'uuid'},
866         'head_kind' => 'arvados#collection',
867         'head_uuid' => $Job->{'output'},
868         'link_class' => 'resources',
869         'name' => 'wants',
870       });
871     }
872   };
873   if ($@) {
874     Log (undef, "Failed to register output manifest: $@");
875   }
876 }
877
878 Log (undef, "finish");
879
880 save_meta();
881 exit 0;
882
883
884
885 sub update_progress_stats
886 {
887   $progress_stats_updated = time;
888   return if !$progress_is_dirty;
889   my ($todo, $done, $running) = (scalar @jobstep_todo,
890                                  scalar @jobstep_done,
891                                  scalar @slot - scalar @freeslot - scalar @holdslot);
892   $Job->{'tasks_summary'} ||= {};
893   $Job->{'tasks_summary'}->{'todo'} = $todo;
894   $Job->{'tasks_summary'}->{'done'} = $done;
895   $Job->{'tasks_summary'}->{'running'} = $running;
896   if ($job_has_uuid) {
897     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
898   }
899   Log (undef, "status: $done done, $running running, $todo todo");
900   $progress_is_dirty = 0;
901 }
902
903
904
905 sub reapchildren
906 {
907   my $pid = waitpid (-1, WNOHANG);
908   return 0 if $pid <= 0;
909
910   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
911                   . "."
912                   . $slot[$proc{$pid}->{slot}]->{cpu});
913   my $jobstepid = $proc{$pid}->{jobstep};
914   my $elapsed = time - $proc{$pid}->{time};
915   my $Jobstep = $jobstep[$jobstepid];
916
917   my $childstatus = $?;
918   my $exitvalue = $childstatus >> 8;
919   my $exitinfo = sprintf("exit %d signal %d%s",
920                          $exitvalue,
921                          $childstatus & 127,
922                          ($childstatus & 128 ? ' core dump' : ''));
923   $Jobstep->{'arvados_task'}->reload;
924   my $task_success = $Jobstep->{'arvados_task'}->{success};
925
926   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
927
928   if (!defined $task_success) {
929     # task did not indicate one way or the other --> fail
930     $Jobstep->{'arvados_task'}->{success} = 0;
931     $Jobstep->{'arvados_task'}->save;
932     $task_success = 0;
933   }
934
935   if (!$task_success)
936   {
937     my $temporary_fail;
938     $temporary_fail ||= $Jobstep->{node_fail};
939     $temporary_fail ||= ($exitvalue == 111);
940
941     ++$thisround_failed;
942     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
943
944     # Check for signs of a failed or misconfigured node
945     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
946         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
947       # Don't count this against jobstep failure thresholds if this
948       # node is already suspected faulty and srun exited quickly
949       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
950           $elapsed < 5) {
951         Log ($jobstepid, "blaming failure on suspect node " .
952              $slot[$proc{$pid}->{slot}]->{node}->{name});
953         $temporary_fail ||= 1;
954       }
955       ban_node_by_slot($proc{$pid}->{slot});
956     }
957
958     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
959                              ++$Jobstep->{'failures'},
960                              $temporary_fail ? 'temporary ' : 'permanent',
961                              $elapsed));
962
963     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
964       # Give up on this task, and the whole job
965       $main::success = 0;
966       $main::please_freeze = 1;
967     }
968     else {
969       # Put this task back on the todo queue
970       push @jobstep_todo, $jobstepid;
971     }
972     $Job->{'tasks_summary'}->{'failed'}++;
973   }
974   else
975   {
976     ++$thisround_succeeded;
977     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
978     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
979     push @jobstep_done, $jobstepid;
980     Log ($jobstepid, "success in $elapsed seconds");
981   }
982   $Jobstep->{exitcode} = $childstatus;
983   $Jobstep->{finishtime} = time;
984   process_stderr ($jobstepid, $task_success);
985   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
986
987   close $reader{$jobstepid};
988   delete $reader{$jobstepid};
989   delete $slot[$proc{$pid}->{slot}]->{pid};
990   push @freeslot, $proc{$pid}->{slot};
991   delete $proc{$pid};
992
993   if ($task_success) {
994     # Load new tasks
995     my $newtask_list = [];
996     my $newtask_results;
997     do {
998       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
999         'where' => {
1000           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1001         },
1002         'order' => 'qsequence',
1003         'offset' => scalar(@$newtask_list),
1004       );
1005       push(@$newtask_list, @{$newtask_results->{items}});
1006     } while (@{$newtask_results->{items}});
1007     foreach my $arvados_task (@$newtask_list) {
1008       my $jobstep = {
1009         'level' => $arvados_task->{'sequence'},
1010         'failures' => 0,
1011         'arvados_task' => $arvados_task
1012       };
1013       push @jobstep, $jobstep;
1014       push @jobstep_todo, $#jobstep;
1015     }
1016   }
1017
1018   $progress_is_dirty = 1;
1019   1;
1020 }
1021
1022 sub check_refresh_wanted
1023 {
1024   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1025   if (@stat && $stat[9] > $latest_refresh) {
1026     $latest_refresh = scalar time;
1027     if ($job_has_uuid) {
1028       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1029       for my $attr ('cancelled_at',
1030                     'cancelled_by_user_uuid',
1031                     'cancelled_by_client_uuid') {
1032         $Job->{$attr} = $Job2->{$attr};
1033       }
1034       if ($Job->{'cancelled_at'}) {
1035         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1036              " by user " . $Job->{cancelled_by_user_uuid});
1037         $main::success = 0;
1038         $main::please_freeze = 1;
1039       }
1040     }
1041   }
1042 }
1043
1044 sub check_squeue
1045 {
1046   # return if the kill list was checked <4 seconds ago
1047   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1048   {
1049     return;
1050   }
1051   $squeue_kill_checked = time;
1052
1053   # use killem() on procs whose killtime is reached
1054   for (keys %proc)
1055   {
1056     if (exists $proc{$_}->{killtime}
1057         && $proc{$_}->{killtime} <= time)
1058     {
1059       killem ($_);
1060     }
1061   }
1062
1063   # return if the squeue was checked <60 seconds ago
1064   if (defined $squeue_checked && $squeue_checked > time - 60)
1065   {
1066     return;
1067   }
1068   $squeue_checked = time;
1069
1070   if (!$have_slurm)
1071   {
1072     # here is an opportunity to check for mysterious problems with local procs
1073     return;
1074   }
1075
1076   # get a list of steps still running
1077   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1078   chop @squeue;
1079   if ($squeue[-1] ne "ok")
1080   {
1081     return;
1082   }
1083   pop @squeue;
1084
1085   # which of my jobsteps are running, according to squeue?
1086   my %ok;
1087   foreach (@squeue)
1088   {
1089     if (/^(\d+)\.(\d+) (\S+)/)
1090     {
1091       if ($1 eq $ENV{SLURM_JOBID})
1092       {
1093         $ok{$3} = 1;
1094       }
1095     }
1096   }
1097
1098   # which of my active child procs (>60s old) were not mentioned by squeue?
1099   foreach (keys %proc)
1100   {
1101     if ($proc{$_}->{time} < time - 60
1102         && !exists $ok{$proc{$_}->{jobstepname}}
1103         && !exists $proc{$_}->{killtime})
1104     {
1105       # kill this proc if it hasn't exited in 30 seconds
1106       $proc{$_}->{killtime} = time + 30;
1107     }
1108   }
1109 }
1110
1111
1112 sub release_allocation
1113 {
1114   if ($have_slurm)
1115   {
1116     Log (undef, "release job allocation");
1117     system "scancel $ENV{SLURM_JOBID}";
1118   }
1119 }
1120
1121
1122 sub readfrompipes
1123 {
1124   my $gotsome = 0;
1125   foreach my $job (keys %reader)
1126   {
1127     my $buf;
1128     while (0 < sysread ($reader{$job}, $buf, 8192))
1129     {
1130       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1131       $jobstep[$job]->{stderr} .= $buf;
1132       preprocess_stderr ($job);
1133       if (length ($jobstep[$job]->{stderr}) > 16384)
1134       {
1135         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1136       }
1137       $gotsome = 1;
1138     }
1139   }
1140   return $gotsome;
1141 }
1142
1143
1144 sub preprocess_stderr
1145 {
1146   my $job = shift;
1147
1148   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1149     my $line = $1;
1150     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1151     Log ($job, "stderr $line");
1152     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1153       # whoa.
1154       $main::please_freeze = 1;
1155     }
1156     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1157       $jobstep[$job]->{node_fail} = 1;
1158       ban_node_by_slot($jobstep[$job]->{slotindex});
1159     }
1160   }
1161 }
1162
1163
1164 sub process_stderr
1165 {
1166   my $job = shift;
1167   my $task_success = shift;
1168   preprocess_stderr ($job);
1169
1170   map {
1171     Log ($job, "stderr $_");
1172   } split ("\n", $jobstep[$job]->{stderr});
1173 }
1174
1175 sub fetch_block
1176 {
1177   my $hash = shift;
1178   my ($keep, $child_out, $output_block);
1179
1180   my $cmd = "arv-get \Q$hash\E";
1181   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1182   $output_block = '';
1183   while (1) {
1184     my $buf;
1185     my $bytes = sysread($keep, $buf, 1024 * 1024);
1186     if (!defined $bytes) {
1187       die "reading from arv-get: $!";
1188     } elsif ($bytes == 0) {
1189       # sysread returns 0 at the end of the pipe.
1190       last;
1191     } else {
1192       # some bytes were read into buf.
1193       $output_block .= $buf;
1194     }
1195   }
1196   close $keep;
1197   return $output_block;
1198 }
1199
1200 sub collate_output
1201 {
1202   Log (undef, "collate");
1203
1204   my ($child_out, $child_in);
1205   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1206   my $joboutput;
1207   for (@jobstep)
1208   {
1209     next if (!exists $_->{'arvados_task'}->{output} ||
1210              !$_->{'arvados_task'}->{'success'} ||
1211              $_->{'exitcode'} != 0);
1212     my $output = $_->{'arvados_task'}->{output};
1213     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1214     {
1215       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1216       print $child_in $output;
1217     }
1218     elsif (@jobstep == 1)
1219     {
1220       $joboutput = $output;
1221       last;
1222     }
1223     elsif (defined (my $outblock = fetch_block ($output)))
1224     {
1225       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1226       print $child_in $outblock;
1227     }
1228     else
1229     {
1230       Log (undef, "XXX fetch_block($output) failed XXX");
1231       $main::success = 0;
1232     }
1233   }
1234   $child_in->close;
1235
1236   if (!defined $joboutput) {
1237     my $s = IO::Select->new($child_out);
1238     if ($s->can_read(120)) {
1239       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1240       chomp($joboutput);
1241     } else {
1242       Log (undef, "timed out reading from 'arv-put'");
1243     }
1244   }
1245   waitpid($pid, 0);
1246
1247   if ($joboutput)
1248   {
1249     Log (undef, "output $joboutput");
1250     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1251   }
1252   else
1253   {
1254     Log (undef, "output undef");
1255   }
1256   return $joboutput;
1257 }
1258
1259
1260 sub killem
1261 {
1262   foreach (@_)
1263   {
1264     my $sig = 2;                # SIGINT first
1265     if (exists $proc{$_}->{"sent_$sig"} &&
1266         time - $proc{$_}->{"sent_$sig"} > 4)
1267     {
1268       $sig = 15;                # SIGTERM if SIGINT doesn't work
1269     }
1270     if (exists $proc{$_}->{"sent_$sig"} &&
1271         time - $proc{$_}->{"sent_$sig"} > 4)
1272     {
1273       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1274     }
1275     if (!exists $proc{$_}->{"sent_$sig"})
1276     {
1277       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1278       kill $sig, $_;
1279       select (undef, undef, undef, 0.1);
1280       if ($sig == 2)
1281       {
1282         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1283       }
1284       $proc{$_}->{"sent_$sig"} = time;
1285       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1286     }
1287   }
1288 }
1289
1290
1291 sub fhbits
1292 {
1293   my($bits);
1294   for (@_) {
1295     vec($bits,fileno($_),1) = 1;
1296   }
1297   $bits;
1298 }
1299
1300
1301 sub Log                         # ($jobstep_id, $logmessage)
1302 {
1303   if ($_[1] =~ /\n/) {
1304     for my $line (split (/\n/, $_[1])) {
1305       Log ($_[0], $line);
1306     }
1307     return;
1308   }
1309   my $fh = select STDERR; $|=1; select $fh;
1310   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1311   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1312   $message .= "\n";
1313   my $datetime;
1314   if ($local_logfile || -t STDERR) {
1315     my @gmtime = gmtime;
1316     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1317                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1318   }
1319   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1320
1321   if ($local_logfile) {
1322     print $local_logfile $datetime . " " . $message;
1323   }
1324 }
1325
1326
1327 sub croak
1328 {
1329   my ($package, $file, $line) = caller;
1330   my $message = "@_ at $file line $line\n";
1331   Log (undef, $message);
1332   freeze() if @jobstep_todo;
1333   collate_output() if @jobstep_todo;
1334   cleanup();
1335   save_meta() if $local_logfile;
1336   die;
1337 }
1338
1339
1340 sub cleanup
1341 {
1342   return if !$job_has_uuid;
1343   $Job->update_attributes('running' => 0,
1344                           'success' => 0,
1345                           'finished_at' => scalar gmtime);
1346 }
1347
1348
1349 sub save_meta
1350 {
1351   my $justcheckpoint = shift; # false if this will be the last meta saved
1352   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1353
1354   $local_logfile->flush;
1355   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1356       . quotemeta($local_logfile->filename);
1357   my $loglocator = `$cmd`;
1358   die "system $cmd failed: $?" if $?;
1359   chomp($loglocator);
1360
1361   $local_logfile = undef;   # the temp file is automatically deleted
1362   Log (undef, "log manifest is $loglocator");
1363   $Job->{'log'} = $loglocator;
1364   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1365 }
1366
1367
1368 sub freeze_if_want_freeze
1369 {
1370   if ($main::please_freeze)
1371   {
1372     release_allocation();
1373     if (@_)
1374     {
1375       # kill some srun procs before freeze+stop
1376       map { $proc{$_} = {} } @_;
1377       while (%proc)
1378       {
1379         killem (keys %proc);
1380         select (undef, undef, undef, 0.1);
1381         my $died;
1382         while (($died = waitpid (-1, WNOHANG)) > 0)
1383         {
1384           delete $proc{$died};
1385         }
1386       }
1387     }
1388     freeze();
1389     collate_output();
1390     cleanup();
1391     save_meta();
1392     exit 0;
1393   }
1394 }
1395
1396
1397 sub freeze
1398 {
1399   Log (undef, "Freeze not implemented");
1400   return;
1401 }
1402
1403
1404 sub thaw
1405 {
1406   croak ("Thaw not implemented");
1407 }
1408
1409
1410 sub freezequote
1411 {
1412   my $s = shift;
1413   $s =~ s/\\/\\\\/g;
1414   $s =~ s/\n/\\n/g;
1415   return $s;
1416 }
1417
1418
1419 sub freezeunquote
1420 {
1421   my $s = shift;
1422   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1423   return $s;
1424 }
1425
1426
1427 sub srun
1428 {
1429   my $srunargs = shift;
1430   my $execargs = shift;
1431   my $opts = shift || {};
1432   my $stdin = shift;
1433   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1434   print STDERR (join (" ",
1435                       map { / / ? "'$_'" : $_ }
1436                       (@$args)),
1437                 "\n")
1438       if $ENV{CRUNCH_DEBUG};
1439
1440   if (defined $stdin) {
1441     my $child = open STDIN, "-|";
1442     defined $child or die "no fork: $!";
1443     if ($child == 0) {
1444       print $stdin or die $!;
1445       close STDOUT or die $!;
1446       exit 0;
1447     }
1448   }
1449
1450   return system (@$args) if $opts->{fork};
1451
1452   exec @$args;
1453   warn "ENV size is ".length(join(" ",%ENV));
1454   die "exec failed: $!: @$args";
1455 }
1456
1457
1458 sub ban_node_by_slot {
1459   # Don't start any new jobsteps on this node for 60 seconds
1460   my $slotid = shift;
1461   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1462   $slot[$slotid]->{node}->{hold_count}++;
1463   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1464 }
1465
1466 sub must_lock_now
1467 {
1468   my ($lockfile, $error_message) = @_;
1469   open L, ">", $lockfile or croak("$lockfile: $!");
1470   if (!flock L, LOCK_EX|LOCK_NB) {
1471     croak("Can't lock $lockfile: $error_message\n");
1472   }
1473 }
1474
1475 sub find_docker_hash {
1476   # Given a Keep locator, search for a matching link to find the Docker hash
1477   # of the stored image.
1478   my $locator = shift;
1479   my $links_result = $arv->{links}->{list}->execute(
1480     filters => [["head_uuid", "=", $locator],
1481                 ["link_class", "=", "docker_image_hash"]],
1482     limit => 1);
1483   my $docker_hash;
1484   foreach my $link (@{$links_result->{items}}) {
1485     $docker_hash = lc($link->{name});
1486   }
1487   return $docker_hash;
1488 }
1489
1490 __DATA__
1491 #!/usr/bin/perl
1492
1493 # checkout-and-build
1494
1495 use Fcntl ':flock';
1496
1497 my $destdir = $ENV{"CRUNCH_SRC"};
1498 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1499 my $repo = $ENV{"CRUNCH_SRC_URL"};
1500
1501 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1502 flock L, LOCK_EX;
1503 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1504     exit 0;
1505 }
1506
1507 unlink "$destdir.commit";
1508 open STDOUT, ">", "$destdir.log";
1509 open STDERR, ">&STDOUT";
1510
1511 mkdir $destdir;
1512 my @git_archive_data = <DATA>;
1513 if (@git_archive_data) {
1514   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1515   print TARX @git_archive_data;
1516   if(!close(TARX)) {
1517     die "'tar -C $destdir -xf -' exited $?: $!";
1518   }
1519 }
1520
1521 my $pwd;
1522 chomp ($pwd = `pwd`);
1523 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1524 mkdir $install_dir;
1525
1526 for my $src_path ("$destdir/arvados/sdk/python") {
1527   if (-d $src_path) {
1528     shell_or_die ("virtualenv", $install_dir);
1529     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1530   }
1531 }
1532
1533 if (-e "$destdir/crunch_scripts/install") {
1534     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1535 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1536     # Old version
1537     shell_or_die ("./tests/autotests.sh", $install_dir);
1538 } elsif (-e "./install.sh") {
1539     shell_or_die ("./install.sh", $install_dir);
1540 }
1541
1542 if ($commit) {
1543     unlink "$destdir.commit.new";
1544     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1545     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1546 }
1547
1548 close L;
1549
1550 exit 0;
1551
1552 sub shell_or_die
1553 {
1554   if ($ENV{"DEBUG"}) {
1555     print STDERR "@_\n";
1556   }
1557   system (@_) == 0
1558       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1559 }
1560
1561 __DATA__