2755: Fix handling of stripped/orig manifest text.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 if ($docker_image) {
506   my $docker_pid = fork();
507   if ($docker_pid == 0)
508   {
509     srun (["srun", "--nodelist=" . join(' ', @node)],
510           [$docker_bin, 'pull', $docker_image]);
511     exit ($?);
512   }
513   while (1)
514   {
515     last if $docker_pid == waitpid (-1, WNOHANG);
516     freeze_if_want_freeze ($docker_pid);
517     select (undef, undef, undef, 0.1);
518   }
519   # If the Docker image was specified as a hash, pull will fail.
520   # Ignore that error.  We'll see what happens when we try to run later.
521   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
522   {
523     croak("Installing Docker image $docker_image returned exit code $?");
524   }
525 }
526
527 foreach (qw (script script_version script_parameters runtime_constraints))
528 {
529   Log (undef,
530        "$_ " .
531        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 }
533 foreach (split (/\n/, $Job->{knobs}))
534 {
535   Log (undef, "knob " . $_);
536 }
537
538
539
540 $main::success = undef;
541
542
543
544 ONELEVEL:
545
546 my $thisround_succeeded = 0;
547 my $thisround_failed = 0;
548 my $thisround_failed_multiple = 0;
549
550 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
551                        or $a <=> $b } @jobstep_todo;
552 my $level = $jobstep[$jobstep_todo[0]]->{level};
553 Log (undef, "start level $level");
554
555
556
557 my %proc;
558 my @freeslot = (0..$#slot);
559 my @holdslot;
560 my %reader;
561 my $progress_is_dirty = 1;
562 my $progress_stats_updated = 0;
563
564 update_progress_stats();
565
566
567
568 THISROUND:
569 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 {
571   my $id = $jobstep_todo[$todo_ptr];
572   my $Jobstep = $jobstep[$id];
573   if ($Jobstep->{level} != $level)
574   {
575     next;
576   }
577
578   pipe $reader{$id}, "writer" or croak ($!);
579   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
580   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581
582   my $childslot = $freeslot[0];
583   my $childnode = $slot[$childslot]->{node};
584   my $childslotname = join (".",
585                             $slot[$childslot]->{node}->{name},
586                             $slot[$childslot]->{cpu});
587   my $childpid = fork();
588   if ($childpid == 0)
589   {
590     $SIG{'INT'} = 'DEFAULT';
591     $SIG{'QUIT'} = 'DEFAULT';
592     $SIG{'TERM'} = 'DEFAULT';
593
594     foreach (values (%reader))
595     {
596       close($_);
597     }
598     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
599     open(STDOUT,">&writer");
600     open(STDERR,">&writer");
601
602     undef $dbh;
603     undef $sth;
604
605     delete $ENV{"GNUPGHOME"};
606     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
607     $ENV{"TASK_QSEQUENCE"} = $id;
608     $ENV{"TASK_SEQUENCE"} = $level;
609     $ENV{"JOB_SCRIPT"} = $Job->{script};
610     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
611       $param =~ tr/a-z/A-Z/;
612       $ENV{"JOB_PARAMETER_$param"} = $value;
613     }
614     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
615     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
616     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_image)
643     {
644       $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=1000 ";
645       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
646       # Dynamically configure the container to use the host system as its
647       # DNS server.  Get the host's global addresses from the ip command,
648       # and turn them into docker --dns options using gawk.
649       $command .=
650           q{$(ip -o address show scope global |
651               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
653       {
654         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
655       }
656       while (my ($env_key, $env_val) = each %ENV)
657       {
658         if ($env_key =~ /^(JOB|TASK)_/) {
659           $command .= "-e \Q$env_key=$env_val\E ";
660         }
661       }
662       $command .= "\Q$docker_image\E ";
663     } else {
664       $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
665     }
666     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
667     my @execargs = ('bash', '-c', $command);
668     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
669     exit (111);
670   }
671   close("writer");
672   if (!defined $childpid)
673   {
674     close $reader{$id};
675     delete $reader{$id};
676     next;
677   }
678   shift @freeslot;
679   $proc{$childpid} = { jobstep => $id,
680                        time => time,
681                        slot => $childslot,
682                        jobstepname => "$job_id.$id.$childpid",
683                      };
684   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
685   $slot[$childslot]->{pid} = $childpid;
686
687   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
688   Log ($id, "child $childpid started on $childslotname");
689   $Jobstep->{starttime} = time;
690   $Jobstep->{node} = $childnode->{name};
691   $Jobstep->{slotindex} = $childslot;
692   delete $Jobstep->{stderr};
693   delete $Jobstep->{finishtime};
694
695   splice @jobstep_todo, $todo_ptr, 1;
696   --$todo_ptr;
697
698   $progress_is_dirty = 1;
699
700   while (!@freeslot
701          ||
702          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
703   {
704     last THISROUND if $main::please_freeze;
705     if ($main::please_info)
706     {
707       $main::please_info = 0;
708       freeze();
709       collate_output();
710       save_meta(1);
711       update_progress_stats();
712     }
713     my $gotsome
714         = readfrompipes ()
715         + reapchildren ();
716     if (!$gotsome)
717     {
718       check_refresh_wanted();
719       check_squeue();
720       update_progress_stats();
721       select (undef, undef, undef, 0.1);
722     }
723     elsif (time - $progress_stats_updated >= 30)
724     {
725       update_progress_stats();
726     }
727     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
728         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
729     {
730       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
731           .($thisround_failed+$thisround_succeeded)
732           .") -- giving up on this round";
733       Log (undef, $message);
734       last THISROUND;
735     }
736
737     # move slots from freeslot to holdslot (or back to freeslot) if necessary
738     for (my $i=$#freeslot; $i>=0; $i--) {
739       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
740         push @holdslot, (splice @freeslot, $i, 1);
741       }
742     }
743     for (my $i=$#holdslot; $i>=0; $i--) {
744       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
745         push @freeslot, (splice @holdslot, $i, 1);
746       }
747     }
748
749     # give up if no nodes are succeeding
750     if (!grep { $_->{node}->{losing_streak} == 0 &&
751                     $_->{node}->{hold_count} < 4 } @slot) {
752       my $message = "Every node has failed -- giving up on this round";
753       Log (undef, $message);
754       last THISROUND;
755     }
756   }
757 }
758
759
760 push @freeslot, splice @holdslot;
761 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
762
763
764 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
765 while (%proc)
766 {
767   if ($main::please_continue) {
768     $main::please_continue = 0;
769     goto THISROUND;
770   }
771   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
772   readfrompipes ();
773   if (!reapchildren())
774   {
775     check_refresh_wanted();
776     check_squeue();
777     update_progress_stats();
778     select (undef, undef, undef, 0.1);
779     killem (keys %proc) if $main::please_freeze;
780   }
781 }
782
783 update_progress_stats();
784 freeze_if_want_freeze();
785
786
787 if (!defined $main::success)
788 {
789   if (@jobstep_todo &&
790       $thisround_succeeded == 0 &&
791       ($thisround_failed == 0 || $thisround_failed > 4))
792   {
793     my $message = "stop because $thisround_failed tasks failed and none succeeded";
794     Log (undef, $message);
795     $main::success = 0;
796   }
797   if (!@jobstep_todo)
798   {
799     $main::success = 1;
800   }
801 }
802
803 goto ONELEVEL if !defined $main::success;
804
805
806 release_allocation();
807 freeze();
808 my $collated_output = &collate_output();
809
810 if ($job_has_uuid) {
811   $Job->update_attributes('running' => 0,
812                           'success' => $collated_output && $main::success,
813                           'finished_at' => scalar gmtime)
814 }
815
816 if ($collated_output)
817 {
818   eval {
819     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
820         or die "failed to get collated manifest: $!";
821     # Read the original manifest, and strip permission hints from it,
822     # so we can put the result in a Collection.
823     my @stripped_manifest_lines = ();
824     my $orig_manifest_text = '';
825     while (my $manifest_line = <$orig_manifest>) {
826       $orig_manifest_text .= $manifest_line;
827       my @words = split(/ /, $manifest_line, -1);
828       foreach my $ii (0..$#words) {
829         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
830           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
831         }
832       }
833       push(@stripped_manifest_lines, join(" ", @words));
834     }
835     my $stripped_manifest_text = join("", @stripped_manifest_lines);
836     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
837       'uuid' => md5_hex($stripped_manifest_text),
838       'manifest_text' => $orig_manifest_text,
839     });
840     $Job->update_attributes('output' => $output->{uuid});
841     if ($Job->{'output_is_persistent'}) {
842       $arv->{'links'}->{'create'}->execute('link' => {
843         'tail_kind' => 'arvados#user',
844         'tail_uuid' => $User->{'uuid'},
845         'head_kind' => 'arvados#collection',
846         'head_uuid' => $Job->{'output'},
847         'link_class' => 'resources',
848         'name' => 'wants',
849       });
850     }
851   };
852   if ($@) {
853     Log (undef, "Failed to register output manifest: $@");
854   }
855 }
856
857 Log (undef, "finish");
858
859 save_meta();
860 exit 0;
861
862
863
864 sub update_progress_stats
865 {
866   $progress_stats_updated = time;
867   return if !$progress_is_dirty;
868   my ($todo, $done, $running) = (scalar @jobstep_todo,
869                                  scalar @jobstep_done,
870                                  scalar @slot - scalar @freeslot - scalar @holdslot);
871   $Job->{'tasks_summary'} ||= {};
872   $Job->{'tasks_summary'}->{'todo'} = $todo;
873   $Job->{'tasks_summary'}->{'done'} = $done;
874   $Job->{'tasks_summary'}->{'running'} = $running;
875   if ($job_has_uuid) {
876     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
877   }
878   Log (undef, "status: $done done, $running running, $todo todo");
879   $progress_is_dirty = 0;
880 }
881
882
883
884 sub reapchildren
885 {
886   my $pid = waitpid (-1, WNOHANG);
887   return 0 if $pid <= 0;
888
889   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
890                   . "."
891                   . $slot[$proc{$pid}->{slot}]->{cpu});
892   my $jobstepid = $proc{$pid}->{jobstep};
893   my $elapsed = time - $proc{$pid}->{time};
894   my $Jobstep = $jobstep[$jobstepid];
895
896   my $childstatus = $?;
897   my $exitvalue = $childstatus >> 8;
898   my $exitinfo = sprintf("exit %d signal %d%s",
899                          $exitvalue,
900                          $childstatus & 127,
901                          ($childstatus & 128 ? ' core dump' : ''));
902   $Jobstep->{'arvados_task'}->reload;
903   my $task_success = $Jobstep->{'arvados_task'}->{success};
904
905   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
906
907   if (!defined $task_success) {
908     # task did not indicate one way or the other --> fail
909     $Jobstep->{'arvados_task'}->{success} = 0;
910     $Jobstep->{'arvados_task'}->save;
911     $task_success = 0;
912   }
913
914   if (!$task_success)
915   {
916     my $temporary_fail;
917     $temporary_fail ||= $Jobstep->{node_fail};
918     $temporary_fail ||= ($exitvalue == 111);
919
920     ++$thisround_failed;
921     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
922
923     # Check for signs of a failed or misconfigured node
924     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
925         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
926       # Don't count this against jobstep failure thresholds if this
927       # node is already suspected faulty and srun exited quickly
928       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
929           $elapsed < 5) {
930         Log ($jobstepid, "blaming failure on suspect node " .
931              $slot[$proc{$pid}->{slot}]->{node}->{name});
932         $temporary_fail ||= 1;
933       }
934       ban_node_by_slot($proc{$pid}->{slot});
935     }
936
937     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
938                              ++$Jobstep->{'failures'},
939                              $temporary_fail ? 'temporary ' : 'permanent',
940                              $elapsed));
941
942     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
943       # Give up on this task, and the whole job
944       $main::success = 0;
945       $main::please_freeze = 1;
946     }
947     else {
948       # Put this task back on the todo queue
949       push @jobstep_todo, $jobstepid;
950     }
951     $Job->{'tasks_summary'}->{'failed'}++;
952   }
953   else
954   {
955     ++$thisround_succeeded;
956     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
957     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
958     push @jobstep_done, $jobstepid;
959     Log ($jobstepid, "success in $elapsed seconds");
960   }
961   $Jobstep->{exitcode} = $childstatus;
962   $Jobstep->{finishtime} = time;
963   process_stderr ($jobstepid, $task_success);
964   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
965
966   close $reader{$jobstepid};
967   delete $reader{$jobstepid};
968   delete $slot[$proc{$pid}->{slot}]->{pid};
969   push @freeslot, $proc{$pid}->{slot};
970   delete $proc{$pid};
971
972   # Load new tasks
973   my $newtask_list = [];
974   my $newtask_results;
975   do {
976     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
977       'where' => {
978         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
979       },
980       'order' => 'qsequence',
981       'offset' => scalar(@$newtask_list),
982     );
983     push(@$newtask_list, @{$newtask_results->{items}});
984   } while (@{$newtask_results->{items}});
985   foreach my $arvados_task (@$newtask_list) {
986     my $jobstep = {
987       'level' => $arvados_task->{'sequence'},
988       'failures' => 0,
989       'arvados_task' => $arvados_task
990     };
991     push @jobstep, $jobstep;
992     push @jobstep_todo, $#jobstep;
993   }
994
995   $progress_is_dirty = 1;
996   1;
997 }
998
999 sub check_refresh_wanted
1000 {
1001   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1002   if (@stat && $stat[9] > $latest_refresh) {
1003     $latest_refresh = scalar time;
1004     if ($job_has_uuid) {
1005       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1006       for my $attr ('cancelled_at',
1007                     'cancelled_by_user_uuid',
1008                     'cancelled_by_client_uuid') {
1009         $Job->{$attr} = $Job2->{$attr};
1010       }
1011       if ($Job->{'cancelled_at'}) {
1012         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1013              " by user " . $Job->{cancelled_by_user_uuid});
1014         $main::success = 0;
1015         $main::please_freeze = 1;
1016       }
1017     }
1018   }
1019 }
1020
1021 sub check_squeue
1022 {
1023   # return if the kill list was checked <4 seconds ago
1024   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1025   {
1026     return;
1027   }
1028   $squeue_kill_checked = time;
1029
1030   # use killem() on procs whose killtime is reached
1031   for (keys %proc)
1032   {
1033     if (exists $proc{$_}->{killtime}
1034         && $proc{$_}->{killtime} <= time)
1035     {
1036       killem ($_);
1037     }
1038   }
1039
1040   # return if the squeue was checked <60 seconds ago
1041   if (defined $squeue_checked && $squeue_checked > time - 60)
1042   {
1043     return;
1044   }
1045   $squeue_checked = time;
1046
1047   if (!$have_slurm)
1048   {
1049     # here is an opportunity to check for mysterious problems with local procs
1050     return;
1051   }
1052
1053   # get a list of steps still running
1054   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1055   chop @squeue;
1056   if ($squeue[-1] ne "ok")
1057   {
1058     return;
1059   }
1060   pop @squeue;
1061
1062   # which of my jobsteps are running, according to squeue?
1063   my %ok;
1064   foreach (@squeue)
1065   {
1066     if (/^(\d+)\.(\d+) (\S+)/)
1067     {
1068       if ($1 eq $ENV{SLURM_JOBID})
1069       {
1070         $ok{$3} = 1;
1071       }
1072     }
1073   }
1074
1075   # which of my active child procs (>60s old) were not mentioned by squeue?
1076   foreach (keys %proc)
1077   {
1078     if ($proc{$_}->{time} < time - 60
1079         && !exists $ok{$proc{$_}->{jobstepname}}
1080         && !exists $proc{$_}->{killtime})
1081     {
1082       # kill this proc if it hasn't exited in 30 seconds
1083       $proc{$_}->{killtime} = time + 30;
1084     }
1085   }
1086 }
1087
1088
1089 sub release_allocation
1090 {
1091   if ($have_slurm)
1092   {
1093     Log (undef, "release job allocation");
1094     system "scancel $ENV{SLURM_JOBID}";
1095   }
1096 }
1097
1098
1099 sub readfrompipes
1100 {
1101   my $gotsome = 0;
1102   foreach my $job (keys %reader)
1103   {
1104     my $buf;
1105     while (0 < sysread ($reader{$job}, $buf, 8192))
1106     {
1107       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1108       $jobstep[$job]->{stderr} .= $buf;
1109       preprocess_stderr ($job);
1110       if (length ($jobstep[$job]->{stderr}) > 16384)
1111       {
1112         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1113       }
1114       $gotsome = 1;
1115     }
1116   }
1117   return $gotsome;
1118 }
1119
1120
1121 sub preprocess_stderr
1122 {
1123   my $job = shift;
1124
1125   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1126     my $line = $1;
1127     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1128     Log ($job, "stderr $line");
1129     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1130       # whoa.
1131       $main::please_freeze = 1;
1132     }
1133     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1134       $jobstep[$job]->{node_fail} = 1;
1135       ban_node_by_slot($jobstep[$job]->{slotindex});
1136     }
1137   }
1138 }
1139
1140
1141 sub process_stderr
1142 {
1143   my $job = shift;
1144   my $task_success = shift;
1145   preprocess_stderr ($job);
1146
1147   map {
1148     Log ($job, "stderr $_");
1149   } split ("\n", $jobstep[$job]->{stderr});
1150 }
1151
1152 sub fetch_block
1153 {
1154   my $hash = shift;
1155   my ($keep, $child_out, $output_block);
1156
1157   my $cmd = "$arv_cli keep get \Q$hash\E";
1158   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1159   sysread($keep, $output_block, 64 * 1024 * 1024);
1160   close $keep;
1161   return $output_block;
1162 }
1163
1164 sub collate_output
1165 {
1166   Log (undef, "collate");
1167
1168   my ($child_out, $child_in);
1169   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1170   my $joboutput;
1171   for (@jobstep)
1172   {
1173     next if (!exists $_->{'arvados_task'}->{output} ||
1174              !$_->{'arvados_task'}->{'success'} ||
1175              $_->{'exitcode'} != 0);
1176     my $output = $_->{'arvados_task'}->{output};
1177     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1178     {
1179       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1180       print $child_in $output;
1181     }
1182     elsif (@jobstep == 1)
1183     {
1184       $joboutput = $output;
1185       last;
1186     }
1187     elsif (defined (my $outblock = fetch_block ($output)))
1188     {
1189       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1190       print $child_in $outblock;
1191     }
1192     else
1193     {
1194       Log (undef, "XXX fetch_block($output) failed XXX");
1195       $main::success = 0;
1196     }
1197   }
1198   $child_in->close;
1199
1200   if (!defined $joboutput) {
1201     my $s = IO::Select->new($child_out);
1202     if ($s->can_read(120)) {
1203       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1204       chomp($joboutput);
1205     } else {
1206       Log (undef, "timed out reading from 'arv keep put'");
1207     }
1208   }
1209   waitpid($pid, 0);
1210
1211   if ($joboutput)
1212   {
1213     Log (undef, "output $joboutput");
1214     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1215   }
1216   else
1217   {
1218     Log (undef, "output undef");
1219   }
1220   return $joboutput;
1221 }
1222
1223
1224 sub killem
1225 {
1226   foreach (@_)
1227   {
1228     my $sig = 2;                # SIGINT first
1229     if (exists $proc{$_}->{"sent_$sig"} &&
1230         time - $proc{$_}->{"sent_$sig"} > 4)
1231     {
1232       $sig = 15;                # SIGTERM if SIGINT doesn't work
1233     }
1234     if (exists $proc{$_}->{"sent_$sig"} &&
1235         time - $proc{$_}->{"sent_$sig"} > 4)
1236     {
1237       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1238     }
1239     if (!exists $proc{$_}->{"sent_$sig"})
1240     {
1241       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1242       kill $sig, $_;
1243       select (undef, undef, undef, 0.1);
1244       if ($sig == 2)
1245       {
1246         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1247       }
1248       $proc{$_}->{"sent_$sig"} = time;
1249       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1250     }
1251   }
1252 }
1253
1254
1255 sub fhbits
1256 {
1257   my($bits);
1258   for (@_) {
1259     vec($bits,fileno($_),1) = 1;
1260   }
1261   $bits;
1262 }
1263
1264
1265 sub Log                         # ($jobstep_id, $logmessage)
1266 {
1267   if ($_[1] =~ /\n/) {
1268     for my $line (split (/\n/, $_[1])) {
1269       Log ($_[0], $line);
1270     }
1271     return;
1272   }
1273   my $fh = select STDERR; $|=1; select $fh;
1274   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1275   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1276   $message .= "\n";
1277   my $datetime;
1278   if ($local_logfile || -t STDERR) {
1279     my @gmtime = gmtime;
1280     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1281                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1282   }
1283   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1284
1285   if ($local_logfile) {
1286     print $local_logfile $datetime . " " . $message;
1287   }
1288 }
1289
1290
1291 sub croak
1292 {
1293   my ($package, $file, $line) = caller;
1294   my $message = "@_ at $file line $line\n";
1295   Log (undef, $message);
1296   freeze() if @jobstep_todo;
1297   collate_output() if @jobstep_todo;
1298   cleanup();
1299   save_meta() if $local_logfile;
1300   die;
1301 }
1302
1303
1304 sub cleanup
1305 {
1306   return if !$job_has_uuid;
1307   $Job->update_attributes('running' => 0,
1308                           'success' => 0,
1309                           'finished_at' => scalar gmtime);
1310 }
1311
1312
1313 sub save_meta
1314 {
1315   my $justcheckpoint = shift; # false if this will be the last meta saved
1316   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1317
1318   $local_logfile->flush;
1319   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1320       . quotemeta($local_logfile->filename);
1321   my $loglocator = `$cmd`;
1322   die "system $cmd failed: $?" if $?;
1323   chomp($loglocator);
1324
1325   $local_logfile = undef;   # the temp file is automatically deleted
1326   Log (undef, "log manifest is $loglocator");
1327   $Job->{'log'} = $loglocator;
1328   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1329 }
1330
1331
1332 sub freeze_if_want_freeze
1333 {
1334   if ($main::please_freeze)
1335   {
1336     release_allocation();
1337     if (@_)
1338     {
1339       # kill some srun procs before freeze+stop
1340       map { $proc{$_} = {} } @_;
1341       while (%proc)
1342       {
1343         killem (keys %proc);
1344         select (undef, undef, undef, 0.1);
1345         my $died;
1346         while (($died = waitpid (-1, WNOHANG)) > 0)
1347         {
1348           delete $proc{$died};
1349         }
1350       }
1351     }
1352     freeze();
1353     collate_output();
1354     cleanup();
1355     save_meta();
1356     exit 0;
1357   }
1358 }
1359
1360
1361 sub freeze
1362 {
1363   Log (undef, "Freeze not implemented");
1364   return;
1365 }
1366
1367
1368 sub thaw
1369 {
1370   croak ("Thaw not implemented");
1371 }
1372
1373
1374 sub freezequote
1375 {
1376   my $s = shift;
1377   $s =~ s/\\/\\\\/g;
1378   $s =~ s/\n/\\n/g;
1379   return $s;
1380 }
1381
1382
1383 sub freezeunquote
1384 {
1385   my $s = shift;
1386   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1387   return $s;
1388 }
1389
1390
1391 sub srun
1392 {
1393   my $srunargs = shift;
1394   my $execargs = shift;
1395   my $opts = shift || {};
1396   my $stdin = shift;
1397   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1398   print STDERR (join (" ",
1399                       map { / / ? "'$_'" : $_ }
1400                       (@$args)),
1401                 "\n")
1402       if $ENV{CRUNCH_DEBUG};
1403
1404   if (defined $stdin) {
1405     my $child = open STDIN, "-|";
1406     defined $child or die "no fork: $!";
1407     if ($child == 0) {
1408       print $stdin or die $!;
1409       close STDOUT or die $!;
1410       exit 0;
1411     }
1412   }
1413
1414   return system (@$args) if $opts->{fork};
1415
1416   exec @$args;
1417   warn "ENV size is ".length(join(" ",%ENV));
1418   die "exec failed: $!: @$args";
1419 }
1420
1421
1422 sub ban_node_by_slot {
1423   # Don't start any new jobsteps on this node for 60 seconds
1424   my $slotid = shift;
1425   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1426   $slot[$slotid]->{node}->{hold_count}++;
1427   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1428 }
1429
1430 sub must_lock_now
1431 {
1432   my ($lockfile, $error_message) = @_;
1433   open L, ">", $lockfile or croak("$lockfile: $!");
1434   if (!flock L, LOCK_EX|LOCK_NB) {
1435     croak("Can't lock $lockfile: $error_message\n");
1436   }
1437 }
1438
1439 __DATA__
1440 #!/usr/bin/perl
1441
1442 # checkout-and-build
1443
1444 use Fcntl ':flock';
1445
1446 my $destdir = $ENV{"CRUNCH_SRC"};
1447 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1448 my $repo = $ENV{"CRUNCH_SRC_URL"};
1449
1450 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1451 flock L, LOCK_EX;
1452 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1453     exit 0;
1454 }
1455
1456 unlink "$destdir.commit";
1457 open STDOUT, ">", "$destdir.log";
1458 open STDERR, ">&STDOUT";
1459
1460 mkdir $destdir;
1461 my @git_archive_data = <DATA>;
1462 if (@git_archive_data) {
1463   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1464   print TARX @git_archive_data;
1465   if(!close(TARX)) {
1466     die "'tar -C $destdir -xf -' exited $?: $!";
1467   }
1468 }
1469
1470 my $pwd;
1471 chomp ($pwd = `pwd`);
1472 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1473 mkdir $install_dir;
1474
1475 for my $src_path ("$destdir/arvados/sdk/python") {
1476   if (-d $src_path) {
1477     shell_or_die ("virtualenv", $install_dir);
1478     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1479   }
1480 }
1481
1482 if (-e "$destdir/crunch_scripts/install") {
1483     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1484 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1485     # Old version
1486     shell_or_die ("./tests/autotests.sh", $install_dir);
1487 } elsif (-e "./install.sh") {
1488     shell_or_die ("./install.sh", $install_dir);
1489 }
1490
1491 if ($commit) {
1492     unlink "$destdir.commit.new";
1493     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1494     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1495 }
1496
1497 close L;
1498
1499 exit 0;
1500
1501 sub shell_or_die
1502 {
1503   if ($ENV{"DEBUG"}) {
1504     print STDERR "@_\n";
1505   }
1506   system (@_) == 0
1507       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1508 }
1509
1510 __DATA__