2883: Improved pagination control a bit, added search.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 if ($docker_image) {
506   my $docker_pid = fork();
507   if ($docker_pid == 0)
508   {
509     srun (["srun", "--nodelist=" . join(' ', @node)],
510           [$docker_bin, 'pull', $docker_image]);
511     exit ($?);
512   }
513   while (1)
514   {
515     last if $docker_pid == waitpid (-1, WNOHANG);
516     freeze_if_want_freeze ($docker_pid);
517     select (undef, undef, undef, 0.1);
518   }
519   # If the Docker image was specified as a hash, pull will fail.
520   # Ignore that error.  We'll see what happens when we try to run later.
521   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
522   {
523     croak("Installing Docker image $docker_image returned exit code $?");
524   }
525 }
526
527 foreach (qw (script script_version script_parameters runtime_constraints))
528 {
529   Log (undef,
530        "$_ " .
531        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 }
533 foreach (split (/\n/, $Job->{knobs}))
534 {
535   Log (undef, "knob " . $_);
536 }
537
538
539
540 $main::success = undef;
541
542
543
544 ONELEVEL:
545
546 my $thisround_succeeded = 0;
547 my $thisround_failed = 0;
548 my $thisround_failed_multiple = 0;
549
550 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
551                        or $a <=> $b } @jobstep_todo;
552 my $level = $jobstep[$jobstep_todo[0]]->{level};
553 Log (undef, "start level $level");
554
555
556
557 my %proc;
558 my @freeslot = (0..$#slot);
559 my @holdslot;
560 my %reader;
561 my $progress_is_dirty = 1;
562 my $progress_stats_updated = 0;
563
564 update_progress_stats();
565
566
567
568 THISROUND:
569 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 {
571   my $id = $jobstep_todo[$todo_ptr];
572   my $Jobstep = $jobstep[$id];
573   if ($Jobstep->{level} != $level)
574   {
575     next;
576   }
577
578   pipe $reader{$id}, "writer" or croak ($!);
579   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
580   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581
582   my $childslot = $freeslot[0];
583   my $childnode = $slot[$childslot]->{node};
584   my $childslotname = join (".",
585                             $slot[$childslot]->{node}->{name},
586                             $slot[$childslot]->{cpu});
587   my $childpid = fork();
588   if ($childpid == 0)
589   {
590     $SIG{'INT'} = 'DEFAULT';
591     $SIG{'QUIT'} = 'DEFAULT';
592     $SIG{'TERM'} = 'DEFAULT';
593
594     foreach (values (%reader))
595     {
596       close($_);
597     }
598     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
599     open(STDOUT,">&writer");
600     open(STDERR,">&writer");
601
602     undef $dbh;
603     undef $sth;
604
605     delete $ENV{"GNUPGHOME"};
606     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
607     $ENV{"TASK_QSEQUENCE"} = $id;
608     $ENV{"TASK_SEQUENCE"} = $level;
609     $ENV{"JOB_SCRIPT"} = $Job->{script};
610     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
611       $param =~ tr/a-z/A-Z/;
612       $ENV{"JOB_PARAMETER_$param"} = $value;
613     }
614     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
615     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
616     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_image)
643     {
644       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
646       # Dynamically configure the container to use the host system as its
647       # DNS server.  Get the host's global addresses from the ip command,
648       # and turn them into docker --dns options using gawk.
649       $command .=
650           q{$(ip -o address show scope global |
651               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
653       {
654         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
655       }
656       while (my ($env_key, $env_val) = each %ENV)
657       {
658         if ($env_key =~ /^(JOB|TASK)_/) {
659           $command .= "-e \Q$env_key=$env_val\E ";
660         }
661       }
662       $command .= "\Q$docker_image\E ";
663     } else {
664       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
665     }
666     $command .= "stdbuf -o0 -e0 ";
667     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
668     my @execargs = ('bash', '-c', $command);
669     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
670     exit (111);
671   }
672   close("writer");
673   if (!defined $childpid)
674   {
675     close $reader{$id};
676     delete $reader{$id};
677     next;
678   }
679   shift @freeslot;
680   $proc{$childpid} = { jobstep => $id,
681                        time => time,
682                        slot => $childslot,
683                        jobstepname => "$job_id.$id.$childpid",
684                      };
685   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
686   $slot[$childslot]->{pid} = $childpid;
687
688   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
689   Log ($id, "child $childpid started on $childslotname");
690   $Jobstep->{starttime} = time;
691   $Jobstep->{node} = $childnode->{name};
692   $Jobstep->{slotindex} = $childslot;
693   delete $Jobstep->{stderr};
694   delete $Jobstep->{finishtime};
695
696   splice @jobstep_todo, $todo_ptr, 1;
697   --$todo_ptr;
698
699   $progress_is_dirty = 1;
700
701   while (!@freeslot
702          ||
703          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
704   {
705     last THISROUND if $main::please_freeze;
706     if ($main::please_info)
707     {
708       $main::please_info = 0;
709       freeze();
710       collate_output();
711       save_meta(1);
712       update_progress_stats();
713     }
714     my $gotsome
715         = readfrompipes ()
716         + reapchildren ();
717     if (!$gotsome)
718     {
719       check_refresh_wanted();
720       check_squeue();
721       update_progress_stats();
722       select (undef, undef, undef, 0.1);
723     }
724     elsif (time - $progress_stats_updated >= 30)
725     {
726       update_progress_stats();
727     }
728     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
729         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
730     {
731       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
732           .($thisround_failed+$thisround_succeeded)
733           .") -- giving up on this round";
734       Log (undef, $message);
735       last THISROUND;
736     }
737
738     # move slots from freeslot to holdslot (or back to freeslot) if necessary
739     for (my $i=$#freeslot; $i>=0; $i--) {
740       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
741         push @holdslot, (splice @freeslot, $i, 1);
742       }
743     }
744     for (my $i=$#holdslot; $i>=0; $i--) {
745       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
746         push @freeslot, (splice @holdslot, $i, 1);
747       }
748     }
749
750     # give up if no nodes are succeeding
751     if (!grep { $_->{node}->{losing_streak} == 0 &&
752                     $_->{node}->{hold_count} < 4 } @slot) {
753       my $message = "Every node has failed -- giving up on this round";
754       Log (undef, $message);
755       last THISROUND;
756     }
757   }
758 }
759
760
761 push @freeslot, splice @holdslot;
762 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
763
764
765 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
766 while (%proc)
767 {
768   if ($main::please_continue) {
769     $main::please_continue = 0;
770     goto THISROUND;
771   }
772   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
773   readfrompipes ();
774   if (!reapchildren())
775   {
776     check_refresh_wanted();
777     check_squeue();
778     update_progress_stats();
779     select (undef, undef, undef, 0.1);
780     killem (keys %proc) if $main::please_freeze;
781   }
782 }
783
784 update_progress_stats();
785 freeze_if_want_freeze();
786
787
788 if (!defined $main::success)
789 {
790   if (@jobstep_todo &&
791       $thisround_succeeded == 0 &&
792       ($thisround_failed == 0 || $thisround_failed > 4))
793   {
794     my $message = "stop because $thisround_failed tasks failed and none succeeded";
795     Log (undef, $message);
796     $main::success = 0;
797   }
798   if (!@jobstep_todo)
799   {
800     $main::success = 1;
801   }
802 }
803
804 goto ONELEVEL if !defined $main::success;
805
806
807 release_allocation();
808 freeze();
809 my $collated_output = &collate_output();
810
811 if ($job_has_uuid) {
812   $Job->update_attributes('running' => 0,
813                           'success' => $collated_output && $main::success,
814                           'finished_at' => scalar gmtime)
815 }
816
817 if ($collated_output)
818 {
819   eval {
820     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
821         or die "failed to get collated manifest: $!";
822     # Read the original manifest, and strip permission hints from it,
823     # so we can put the result in a Collection.
824     my @stripped_manifest_lines = ();
825     my $orig_manifest_text = '';
826     while (my $manifest_line = <$orig_manifest>) {
827       $orig_manifest_text .= $manifest_line;
828       my @words = split(/ /, $manifest_line, -1);
829       foreach my $ii (0..$#words) {
830         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
831           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
832         }
833       }
834       push(@stripped_manifest_lines, join(" ", @words));
835     }
836     my $stripped_manifest_text = join("", @stripped_manifest_lines);
837     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
838       'uuid' => md5_hex($stripped_manifest_text),
839       'manifest_text' => $orig_manifest_text,
840     });
841     $Job->update_attributes('output' => $output->{uuid});
842     if ($Job->{'output_is_persistent'}) {
843       $arv->{'links'}->{'create'}->execute('link' => {
844         'tail_kind' => 'arvados#user',
845         'tail_uuid' => $User->{'uuid'},
846         'head_kind' => 'arvados#collection',
847         'head_uuid' => $Job->{'output'},
848         'link_class' => 'resources',
849         'name' => 'wants',
850       });
851     }
852   };
853   if ($@) {
854     Log (undef, "Failed to register output manifest: $@");
855   }
856 }
857
858 Log (undef, "finish");
859
860 save_meta();
861 exit 0;
862
863
864
865 sub update_progress_stats
866 {
867   $progress_stats_updated = time;
868   return if !$progress_is_dirty;
869   my ($todo, $done, $running) = (scalar @jobstep_todo,
870                                  scalar @jobstep_done,
871                                  scalar @slot - scalar @freeslot - scalar @holdslot);
872   $Job->{'tasks_summary'} ||= {};
873   $Job->{'tasks_summary'}->{'todo'} = $todo;
874   $Job->{'tasks_summary'}->{'done'} = $done;
875   $Job->{'tasks_summary'}->{'running'} = $running;
876   if ($job_has_uuid) {
877     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
878   }
879   Log (undef, "status: $done done, $running running, $todo todo");
880   $progress_is_dirty = 0;
881 }
882
883
884
885 sub reapchildren
886 {
887   my $pid = waitpid (-1, WNOHANG);
888   return 0 if $pid <= 0;
889
890   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
891                   . "."
892                   . $slot[$proc{$pid}->{slot}]->{cpu});
893   my $jobstepid = $proc{$pid}->{jobstep};
894   my $elapsed = time - $proc{$pid}->{time};
895   my $Jobstep = $jobstep[$jobstepid];
896
897   my $childstatus = $?;
898   my $exitvalue = $childstatus >> 8;
899   my $exitinfo = sprintf("exit %d signal %d%s",
900                          $exitvalue,
901                          $childstatus & 127,
902                          ($childstatus & 128 ? ' core dump' : ''));
903   $Jobstep->{'arvados_task'}->reload;
904   my $task_success = $Jobstep->{'arvados_task'}->{success};
905
906   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
907
908   if (!defined $task_success) {
909     # task did not indicate one way or the other --> fail
910     $Jobstep->{'arvados_task'}->{success} = 0;
911     $Jobstep->{'arvados_task'}->save;
912     $task_success = 0;
913   }
914
915   if (!$task_success)
916   {
917     my $temporary_fail;
918     $temporary_fail ||= $Jobstep->{node_fail};
919     $temporary_fail ||= ($exitvalue == 111);
920
921     ++$thisround_failed;
922     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
923
924     # Check for signs of a failed or misconfigured node
925     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
926         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
927       # Don't count this against jobstep failure thresholds if this
928       # node is already suspected faulty and srun exited quickly
929       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
930           $elapsed < 5) {
931         Log ($jobstepid, "blaming failure on suspect node " .
932              $slot[$proc{$pid}->{slot}]->{node}->{name});
933         $temporary_fail ||= 1;
934       }
935       ban_node_by_slot($proc{$pid}->{slot});
936     }
937
938     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
939                              ++$Jobstep->{'failures'},
940                              $temporary_fail ? 'temporary ' : 'permanent',
941                              $elapsed));
942
943     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
944       # Give up on this task, and the whole job
945       $main::success = 0;
946       $main::please_freeze = 1;
947     }
948     else {
949       # Put this task back on the todo queue
950       push @jobstep_todo, $jobstepid;
951     }
952     $Job->{'tasks_summary'}->{'failed'}++;
953   }
954   else
955   {
956     ++$thisround_succeeded;
957     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
958     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
959     push @jobstep_done, $jobstepid;
960     Log ($jobstepid, "success in $elapsed seconds");
961   }
962   $Jobstep->{exitcode} = $childstatus;
963   $Jobstep->{finishtime} = time;
964   process_stderr ($jobstepid, $task_success);
965   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
966
967   close $reader{$jobstepid};
968   delete $reader{$jobstepid};
969   delete $slot[$proc{$pid}->{slot}]->{pid};
970   push @freeslot, $proc{$pid}->{slot};
971   delete $proc{$pid};
972
973   # Load new tasks
974   my $newtask_list = [];
975   my $newtask_results;
976   do {
977     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
978       'where' => {
979         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
980       },
981       'order' => 'qsequence',
982       'offset' => scalar(@$newtask_list),
983     );
984     push(@$newtask_list, @{$newtask_results->{items}});
985   } while (@{$newtask_results->{items}});
986   foreach my $arvados_task (@$newtask_list) {
987     my $jobstep = {
988       'level' => $arvados_task->{'sequence'},
989       'failures' => 0,
990       'arvados_task' => $arvados_task
991     };
992     push @jobstep, $jobstep;
993     push @jobstep_todo, $#jobstep;
994   }
995
996   $progress_is_dirty = 1;
997   1;
998 }
999
1000 sub check_refresh_wanted
1001 {
1002   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1003   if (@stat && $stat[9] > $latest_refresh) {
1004     $latest_refresh = scalar time;
1005     if ($job_has_uuid) {
1006       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1007       for my $attr ('cancelled_at',
1008                     'cancelled_by_user_uuid',
1009                     'cancelled_by_client_uuid') {
1010         $Job->{$attr} = $Job2->{$attr};
1011       }
1012       if ($Job->{'cancelled_at'}) {
1013         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1014              " by user " . $Job->{cancelled_by_user_uuid});
1015         $main::success = 0;
1016         $main::please_freeze = 1;
1017       }
1018     }
1019   }
1020 }
1021
1022 sub check_squeue
1023 {
1024   # return if the kill list was checked <4 seconds ago
1025   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1026   {
1027     return;
1028   }
1029   $squeue_kill_checked = time;
1030
1031   # use killem() on procs whose killtime is reached
1032   for (keys %proc)
1033   {
1034     if (exists $proc{$_}->{killtime}
1035         && $proc{$_}->{killtime} <= time)
1036     {
1037       killem ($_);
1038     }
1039   }
1040
1041   # return if the squeue was checked <60 seconds ago
1042   if (defined $squeue_checked && $squeue_checked > time - 60)
1043   {
1044     return;
1045   }
1046   $squeue_checked = time;
1047
1048   if (!$have_slurm)
1049   {
1050     # here is an opportunity to check for mysterious problems with local procs
1051     return;
1052   }
1053
1054   # get a list of steps still running
1055   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1056   chop @squeue;
1057   if ($squeue[-1] ne "ok")
1058   {
1059     return;
1060   }
1061   pop @squeue;
1062
1063   # which of my jobsteps are running, according to squeue?
1064   my %ok;
1065   foreach (@squeue)
1066   {
1067     if (/^(\d+)\.(\d+) (\S+)/)
1068     {
1069       if ($1 eq $ENV{SLURM_JOBID})
1070       {
1071         $ok{$3} = 1;
1072       }
1073     }
1074   }
1075
1076   # which of my active child procs (>60s old) were not mentioned by squeue?
1077   foreach (keys %proc)
1078   {
1079     if ($proc{$_}->{time} < time - 60
1080         && !exists $ok{$proc{$_}->{jobstepname}}
1081         && !exists $proc{$_}->{killtime})
1082     {
1083       # kill this proc if it hasn't exited in 30 seconds
1084       $proc{$_}->{killtime} = time + 30;
1085     }
1086   }
1087 }
1088
1089
1090 sub release_allocation
1091 {
1092   if ($have_slurm)
1093   {
1094     Log (undef, "release job allocation");
1095     system "scancel $ENV{SLURM_JOBID}";
1096   }
1097 }
1098
1099
1100 sub readfrompipes
1101 {
1102   my $gotsome = 0;
1103   foreach my $job (keys %reader)
1104   {
1105     my $buf;
1106     while (0 < sysread ($reader{$job}, $buf, 8192))
1107     {
1108       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1109       $jobstep[$job]->{stderr} .= $buf;
1110       preprocess_stderr ($job);
1111       if (length ($jobstep[$job]->{stderr}) > 16384)
1112       {
1113         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1114       }
1115       $gotsome = 1;
1116     }
1117   }
1118   return $gotsome;
1119 }
1120
1121
1122 sub preprocess_stderr
1123 {
1124   my $job = shift;
1125
1126   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1127     my $line = $1;
1128     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1129     Log ($job, "stderr $line");
1130     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1131       # whoa.
1132       $main::please_freeze = 1;
1133     }
1134     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1135       $jobstep[$job]->{node_fail} = 1;
1136       ban_node_by_slot($jobstep[$job]->{slotindex});
1137     }
1138   }
1139 }
1140
1141
1142 sub process_stderr
1143 {
1144   my $job = shift;
1145   my $task_success = shift;
1146   preprocess_stderr ($job);
1147
1148   map {
1149     Log ($job, "stderr $_");
1150   } split ("\n", $jobstep[$job]->{stderr});
1151 }
1152
1153 sub fetch_block
1154 {
1155   my $hash = shift;
1156   my ($keep, $child_out, $output_block);
1157
1158   my $cmd = "$arv_cli keep get \Q$hash\E";
1159   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1160   sysread($keep, $output_block, 64 * 1024 * 1024);
1161   close $keep;
1162   return $output_block;
1163 }
1164
1165 sub collate_output
1166 {
1167   Log (undef, "collate");
1168
1169   my ($child_out, $child_in);
1170   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1171   my $joboutput;
1172   for (@jobstep)
1173   {
1174     next if (!exists $_->{'arvados_task'}->{output} ||
1175              !$_->{'arvados_task'}->{'success'} ||
1176              $_->{'exitcode'} != 0);
1177     my $output = $_->{'arvados_task'}->{output};
1178     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1179     {
1180       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1181       print $child_in $output;
1182     }
1183     elsif (@jobstep == 1)
1184     {
1185       $joboutput = $output;
1186       last;
1187     }
1188     elsif (defined (my $outblock = fetch_block ($output)))
1189     {
1190       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1191       print $child_in $outblock;
1192     }
1193     else
1194     {
1195       Log (undef, "XXX fetch_block($output) failed XXX");
1196       $main::success = 0;
1197     }
1198   }
1199   $child_in->close;
1200
1201   if (!defined $joboutput) {
1202     my $s = IO::Select->new($child_out);
1203     if ($s->can_read(120)) {
1204       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1205       chomp($joboutput);
1206     } else {
1207       Log (undef, "timed out reading from 'arv keep put'");
1208     }
1209   }
1210   waitpid($pid, 0);
1211
1212   if ($joboutput)
1213   {
1214     Log (undef, "output $joboutput");
1215     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1216   }
1217   else
1218   {
1219     Log (undef, "output undef");
1220   }
1221   return $joboutput;
1222 }
1223
1224
1225 sub killem
1226 {
1227   foreach (@_)
1228   {
1229     my $sig = 2;                # SIGINT first
1230     if (exists $proc{$_}->{"sent_$sig"} &&
1231         time - $proc{$_}->{"sent_$sig"} > 4)
1232     {
1233       $sig = 15;                # SIGTERM if SIGINT doesn't work
1234     }
1235     if (exists $proc{$_}->{"sent_$sig"} &&
1236         time - $proc{$_}->{"sent_$sig"} > 4)
1237     {
1238       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1239     }
1240     if (!exists $proc{$_}->{"sent_$sig"})
1241     {
1242       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1243       kill $sig, $_;
1244       select (undef, undef, undef, 0.1);
1245       if ($sig == 2)
1246       {
1247         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1248       }
1249       $proc{$_}->{"sent_$sig"} = time;
1250       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1251     }
1252   }
1253 }
1254
1255
1256 sub fhbits
1257 {
1258   my($bits);
1259   for (@_) {
1260     vec($bits,fileno($_),1) = 1;
1261   }
1262   $bits;
1263 }
1264
1265
1266 sub Log                         # ($jobstep_id, $logmessage)
1267 {
1268   if ($_[1] =~ /\n/) {
1269     for my $line (split (/\n/, $_[1])) {
1270       Log ($_[0], $line);
1271     }
1272     return;
1273   }
1274   my $fh = select STDERR; $|=1; select $fh;
1275   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1276   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1277   $message .= "\n";
1278   my $datetime;
1279   if ($local_logfile || -t STDERR) {
1280     my @gmtime = gmtime;
1281     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1282                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1283   }
1284   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1285
1286   if ($local_logfile) {
1287     print $local_logfile $datetime . " " . $message;
1288   }
1289 }
1290
1291
1292 sub croak
1293 {
1294   my ($package, $file, $line) = caller;
1295   my $message = "@_ at $file line $line\n";
1296   Log (undef, $message);
1297   freeze() if @jobstep_todo;
1298   collate_output() if @jobstep_todo;
1299   cleanup();
1300   save_meta() if $local_logfile;
1301   die;
1302 }
1303
1304
1305 sub cleanup
1306 {
1307   return if !$job_has_uuid;
1308   $Job->update_attributes('running' => 0,
1309                           'success' => 0,
1310                           'finished_at' => scalar gmtime);
1311 }
1312
1313
1314 sub save_meta
1315 {
1316   my $justcheckpoint = shift; # false if this will be the last meta saved
1317   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1318
1319   $local_logfile->flush;
1320   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1321       . quotemeta($local_logfile->filename);
1322   my $loglocator = `$cmd`;
1323   die "system $cmd failed: $?" if $?;
1324   chomp($loglocator);
1325
1326   $local_logfile = undef;   # the temp file is automatically deleted
1327   Log (undef, "log manifest is $loglocator");
1328   $Job->{'log'} = $loglocator;
1329   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1330 }
1331
1332
1333 sub freeze_if_want_freeze
1334 {
1335   if ($main::please_freeze)
1336   {
1337     release_allocation();
1338     if (@_)
1339     {
1340       # kill some srun procs before freeze+stop
1341       map { $proc{$_} = {} } @_;
1342       while (%proc)
1343       {
1344         killem (keys %proc);
1345         select (undef, undef, undef, 0.1);
1346         my $died;
1347         while (($died = waitpid (-1, WNOHANG)) > 0)
1348         {
1349           delete $proc{$died};
1350         }
1351       }
1352     }
1353     freeze();
1354     collate_output();
1355     cleanup();
1356     save_meta();
1357     exit 0;
1358   }
1359 }
1360
1361
1362 sub freeze
1363 {
1364   Log (undef, "Freeze not implemented");
1365   return;
1366 }
1367
1368
1369 sub thaw
1370 {
1371   croak ("Thaw not implemented");
1372 }
1373
1374
1375 sub freezequote
1376 {
1377   my $s = shift;
1378   $s =~ s/\\/\\\\/g;
1379   $s =~ s/\n/\\n/g;
1380   return $s;
1381 }
1382
1383
1384 sub freezeunquote
1385 {
1386   my $s = shift;
1387   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1388   return $s;
1389 }
1390
1391
1392 sub srun
1393 {
1394   my $srunargs = shift;
1395   my $execargs = shift;
1396   my $opts = shift || {};
1397   my $stdin = shift;
1398   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1399   print STDERR (join (" ",
1400                       map { / / ? "'$_'" : $_ }
1401                       (@$args)),
1402                 "\n")
1403       if $ENV{CRUNCH_DEBUG};
1404
1405   if (defined $stdin) {
1406     my $child = open STDIN, "-|";
1407     defined $child or die "no fork: $!";
1408     if ($child == 0) {
1409       print $stdin or die $!;
1410       close STDOUT or die $!;
1411       exit 0;
1412     }
1413   }
1414
1415   return system (@$args) if $opts->{fork};
1416
1417   exec @$args;
1418   warn "ENV size is ".length(join(" ",%ENV));
1419   die "exec failed: $!: @$args";
1420 }
1421
1422
1423 sub ban_node_by_slot {
1424   # Don't start any new jobsteps on this node for 60 seconds
1425   my $slotid = shift;
1426   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1427   $slot[$slotid]->{node}->{hold_count}++;
1428   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1429 }
1430
1431 sub must_lock_now
1432 {
1433   my ($lockfile, $error_message) = @_;
1434   open L, ">", $lockfile or croak("$lockfile: $!");
1435   if (!flock L, LOCK_EX|LOCK_NB) {
1436     croak("Can't lock $lockfile: $error_message\n");
1437   }
1438 }
1439
1440 __DATA__
1441 #!/usr/bin/perl
1442
1443 # checkout-and-build
1444
1445 use Fcntl ':flock';
1446
1447 my $destdir = $ENV{"CRUNCH_SRC"};
1448 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1449 my $repo = $ENV{"CRUNCH_SRC_URL"};
1450
1451 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1452 flock L, LOCK_EX;
1453 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1454     exit 0;
1455 }
1456
1457 unlink "$destdir.commit";
1458 open STDOUT, ">", "$destdir.log";
1459 open STDERR, ">&STDOUT";
1460
1461 mkdir $destdir;
1462 my @git_archive_data = <DATA>;
1463 if (@git_archive_data) {
1464   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1465   print TARX @git_archive_data;
1466   if(!close(TARX)) {
1467     die "'tar -C $destdir -xf -' exited $?: $!";
1468   }
1469 }
1470
1471 my $pwd;
1472 chomp ($pwd = `pwd`);
1473 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1474 mkdir $install_dir;
1475
1476 for my $src_path ("$destdir/arvados/sdk/python") {
1477   if (-d $src_path) {
1478     shell_or_die ("virtualenv", $install_dir);
1479     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1480   }
1481 }
1482
1483 if (-e "$destdir/crunch_scripts/install") {
1484     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1485 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1486     # Old version
1487     shell_or_die ("./tests/autotests.sh", $install_dir);
1488 } elsif (-e "./install.sh") {
1489     shell_or_die ("./install.sh", $install_dir);
1490 }
1491
1492 if ($commit) {
1493     unlink "$destdir.commit.new";
1494     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1495     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1496 }
1497
1498 close L;
1499
1500 exit 0;
1501
1502 sub shell_or_die
1503 {
1504   if ($ENV{"DEBUG"}) {
1505     print STDERR "@_\n";
1506   }
1507   system (@_) == 0
1508       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1509 }
1510
1511 __DATA__