Merge branch 'master' into 2882-job-process-stats refs #2882
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 if ($docker_image) {
506   my $docker_pid = fork();
507   if ($docker_pid == 0)
508   {
509     srun (["srun", "--nodelist=" . join(' ', @node)],
510           [$docker_bin, 'pull', $docker_image]);
511     exit ($?);
512   }
513   while (1)
514   {
515     last if $docker_pid == waitpid (-1, WNOHANG);
516     freeze_if_want_freeze ($docker_pid);
517     select (undef, undef, undef, 0.1);
518   }
519   # If the Docker image was specified as a hash, pull will fail.
520   # Ignore that error.  We'll see what happens when we try to run later.
521   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
522   {
523     croak("Installing Docker image $docker_image returned exit code $?");
524   }
525 }
526
527 foreach (qw (script script_version script_parameters runtime_constraints))
528 {
529   Log (undef,
530        "$_ " .
531        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 }
533 foreach (split (/\n/, $Job->{knobs}))
534 {
535   Log (undef, "knob " . $_);
536 }
537
538
539
540 $main::success = undef;
541
542
543
544 ONELEVEL:
545
546 my $thisround_succeeded = 0;
547 my $thisround_failed = 0;
548 my $thisround_failed_multiple = 0;
549
550 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
551                        or $a <=> $b } @jobstep_todo;
552 my $level = $jobstep[$jobstep_todo[0]]->{level};
553 Log (undef, "start level $level");
554
555
556
557 my %proc;
558 my @freeslot = (0..$#slot);
559 my @holdslot;
560 my %reader;
561 my $progress_is_dirty = 1;
562 my $progress_stats_updated = 0;
563
564 update_progress_stats();
565
566
567
568 THISROUND:
569 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 {
571   my $id = $jobstep_todo[$todo_ptr];
572   my $Jobstep = $jobstep[$id];
573   if ($Jobstep->{level} != $level)
574   {
575     next;
576   }
577
578   pipe $reader{$id}, "writer" or croak ($!);
579   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
580   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581
582   my $childslot = $freeslot[0];
583   my $childnode = $slot[$childslot]->{node};
584   my $childslotname = join (".",
585                             $slot[$childslot]->{node}->{name},
586                             $slot[$childslot]->{cpu});
587   my $childpid = fork();
588   if ($childpid == 0)
589   {
590     $SIG{'INT'} = 'DEFAULT';
591     $SIG{'QUIT'} = 'DEFAULT';
592     $SIG{'TERM'} = 'DEFAULT';
593
594     foreach (values (%reader))
595     {
596       close($_);
597     }
598     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
599     open(STDOUT,">&writer");
600     open(STDERR,">&writer");
601
602     undef $dbh;
603     undef $sth;
604
605     delete $ENV{"GNUPGHOME"};
606     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
607     $ENV{"TASK_QSEQUENCE"} = $id;
608     $ENV{"TASK_SEQUENCE"} = $level;
609     $ENV{"JOB_SCRIPT"} = $Job->{script};
610     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
611       $param =~ tr/a-z/A-Z/;
612       $ENV{"JOB_PARAMETER_$param"} = $value;
613     }
614     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
615     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
616     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_image)
643     {
644       $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=1000 ";
645       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
646       # Dynamically configure the container to use the host system as its
647       # DNS server.  Get the host's global addresses from the ip command,
648       # and turn them into docker --dns options using gawk.
649       $command .=
650           q{$(ip -o address show scope global |
651               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
653       {
654         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
655       }
656       while (my ($env_key, $env_val) = each %ENV)
657       {
658         if ($env_key =~ /^(JOB|TASK)_/) {
659           $command .= "-e \Q$env_key=$env_val\E ";
660         }
661       }
662       $command .= "\Q$docker_image\E ";
663     } else {
664       $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
665     }
666     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
667     my @execargs = ('bash', '-c', $command);
668     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
669     exit (111);
670   }
671   close("writer");
672   if (!defined $childpid)
673   {
674     close $reader{$id};
675     delete $reader{$id};
676     next;
677   }
678   shift @freeslot;
679   $proc{$childpid} = { jobstep => $id,
680                        time => time,
681                        slot => $childslot,
682                        jobstepname => "$job_id.$id.$childpid",
683                      };
684   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
685   $slot[$childslot]->{pid} = $childpid;
686
687   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
688   Log ($id, "child $childpid started on $childslotname");
689   $Jobstep->{starttime} = time;
690   $Jobstep->{node} = $childnode->{name};
691   $Jobstep->{slotindex} = $childslot;
692   delete $Jobstep->{stderr};
693   delete $Jobstep->{finishtime};
694
695   splice @jobstep_todo, $todo_ptr, 1;
696   --$todo_ptr;
697
698   $progress_is_dirty = 1;
699
700   while (!@freeslot
701          ||
702          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
703   {
704     last THISROUND if $main::please_freeze;
705     if ($main::please_info)
706     {
707       $main::please_info = 0;
708       freeze();
709       collate_output();
710       save_meta(1);
711       update_progress_stats();
712     }
713     my $gotsome
714         = readfrompipes ()
715         + reapchildren ();
716     if (!$gotsome)
717     {
718       check_refresh_wanted();
719       check_squeue();
720       update_progress_stats();
721       select (undef, undef, undef, 0.1);
722     }
723     elsif (time - $progress_stats_updated >= 30)
724     {
725       update_progress_stats();
726     }
727     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
728         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
729     {
730       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
731           .($thisround_failed+$thisround_succeeded)
732           .") -- giving up on this round";
733       Log (undef, $message);
734       last THISROUND;
735     }
736
737     # move slots from freeslot to holdslot (or back to freeslot) if necessary
738     for (my $i=$#freeslot; $i>=0; $i--) {
739       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
740         push @holdslot, (splice @freeslot, $i, 1);
741       }
742     }
743     for (my $i=$#holdslot; $i>=0; $i--) {
744       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
745         push @freeslot, (splice @holdslot, $i, 1);
746       }
747     }
748
749     # give up if no nodes are succeeding
750     if (!grep { $_->{node}->{losing_streak} == 0 &&
751                     $_->{node}->{hold_count} < 4 } @slot) {
752       my $message = "Every node has failed -- giving up on this round";
753       Log (undef, $message);
754       last THISROUND;
755     }
756   }
757 }
758
759
760 push @freeslot, splice @holdslot;
761 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
762
763
764 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
765 while (%proc)
766 {
767   if ($main::please_continue) {
768     $main::please_continue = 0;
769     goto THISROUND;
770   }
771   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
772   readfrompipes ();
773   if (!reapchildren())
774   {
775     check_refresh_wanted();
776     check_squeue();
777     update_progress_stats();
778     select (undef, undef, undef, 0.1);
779     killem (keys %proc) if $main::please_freeze;
780   }
781 }
782
783 update_progress_stats();
784 freeze_if_want_freeze();
785
786
787 if (!defined $main::success)
788 {
789   if (@jobstep_todo &&
790       $thisround_succeeded == 0 &&
791       ($thisround_failed == 0 || $thisround_failed > 4))
792   {
793     my $message = "stop because $thisround_failed tasks failed and none succeeded";
794     Log (undef, $message);
795     $main::success = 0;
796   }
797   if (!@jobstep_todo)
798   {
799     $main::success = 1;
800   }
801 }
802
803 goto ONELEVEL if !defined $main::success;
804
805
806 release_allocation();
807 freeze();
808 my $collated_output = &collate_output();
809
810 if ($job_has_uuid) {
811   $Job->update_attributes('running' => 0,
812                           'success' => $collated_output && $main::success,
813                           'finished_at' => scalar gmtime)
814 }
815
816 if ($collated_output)
817 {
818   eval {
819     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
820         or die "failed to get collated manifest: $!";
821     # Read the original manifest, and strip permission hints from it,
822     # so we can put the result in a Collection.
823     my @manifest_lines = ();
824     while (my $manifest_line = <$orig_manifest>) {
825       my @words = split(/ /, $manifest_line, -1);
826       foreach my $ii (0..$#words) {
827         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
828           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
829         }
830       }
831       push(@manifest_lines, join(" ", @words));
832     }
833     my $manifest_text = join("", @manifest_lines);
834     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
835       'uuid' => md5_hex($manifest_text),
836       'manifest_text' => $manifest_text,
837     });
838     $Job->update_attributes('output' => $output->{uuid});
839     if ($Job->{'output_is_persistent'}) {
840       $arv->{'links'}->{'create'}->execute('link' => {
841         'tail_kind' => 'arvados#user',
842         'tail_uuid' => $User->{'uuid'},
843         'head_kind' => 'arvados#collection',
844         'head_uuid' => $Job->{'output'},
845         'link_class' => 'resources',
846         'name' => 'wants',
847       });
848     }
849   };
850   if ($@) {
851     Log (undef, "Failed to register output manifest: $@");
852   }
853 }
854
855 Log (undef, "finish");
856
857 save_meta();
858 exit 0;
859
860
861
862 sub update_progress_stats
863 {
864   $progress_stats_updated = time;
865   return if !$progress_is_dirty;
866   my ($todo, $done, $running) = (scalar @jobstep_todo,
867                                  scalar @jobstep_done,
868                                  scalar @slot - scalar @freeslot - scalar @holdslot);
869   $Job->{'tasks_summary'} ||= {};
870   $Job->{'tasks_summary'}->{'todo'} = $todo;
871   $Job->{'tasks_summary'}->{'done'} = $done;
872   $Job->{'tasks_summary'}->{'running'} = $running;
873   if ($job_has_uuid) {
874     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
875   }
876   Log (undef, "status: $done done, $running running, $todo todo");
877   $progress_is_dirty = 0;
878 }
879
880
881
882 sub reapchildren
883 {
884   my $pid = waitpid (-1, WNOHANG);
885   return 0 if $pid <= 0;
886
887   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
888                   . "."
889                   . $slot[$proc{$pid}->{slot}]->{cpu});
890   my $jobstepid = $proc{$pid}->{jobstep};
891   my $elapsed = time - $proc{$pid}->{time};
892   my $Jobstep = $jobstep[$jobstepid];
893
894   my $childstatus = $?;
895   my $exitvalue = $childstatus >> 8;
896   my $exitinfo = sprintf("exit %d signal %d%s",
897                          $exitvalue,
898                          $childstatus & 127,
899                          ($childstatus & 128 ? ' core dump' : ''));
900   $Jobstep->{'arvados_task'}->reload;
901   my $task_success = $Jobstep->{'arvados_task'}->{success};
902
903   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
904
905   if (!defined $task_success) {
906     # task did not indicate one way or the other --> fail
907     $Jobstep->{'arvados_task'}->{success} = 0;
908     $Jobstep->{'arvados_task'}->save;
909     $task_success = 0;
910   }
911
912   if (!$task_success)
913   {
914     my $temporary_fail;
915     $temporary_fail ||= $Jobstep->{node_fail};
916     $temporary_fail ||= ($exitvalue == 111);
917
918     ++$thisround_failed;
919     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
920
921     # Check for signs of a failed or misconfigured node
922     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
923         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
924       # Don't count this against jobstep failure thresholds if this
925       # node is already suspected faulty and srun exited quickly
926       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
927           $elapsed < 5) {
928         Log ($jobstepid, "blaming failure on suspect node " .
929              $slot[$proc{$pid}->{slot}]->{node}->{name});
930         $temporary_fail ||= 1;
931       }
932       ban_node_by_slot($proc{$pid}->{slot});
933     }
934
935     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
936                              ++$Jobstep->{'failures'},
937                              $temporary_fail ? 'temporary ' : 'permanent',
938                              $elapsed));
939
940     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
941       # Give up on this task, and the whole job
942       $main::success = 0;
943       $main::please_freeze = 1;
944     }
945     else {
946       # Put this task back on the todo queue
947       push @jobstep_todo, $jobstepid;
948     }
949     $Job->{'tasks_summary'}->{'failed'}++;
950   }
951   else
952   {
953     ++$thisround_succeeded;
954     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
955     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
956     push @jobstep_done, $jobstepid;
957     Log ($jobstepid, "success in $elapsed seconds");
958   }
959   $Jobstep->{exitcode} = $childstatus;
960   $Jobstep->{finishtime} = time;
961   process_stderr ($jobstepid, $task_success);
962   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
963
964   close $reader{$jobstepid};
965   delete $reader{$jobstepid};
966   delete $slot[$proc{$pid}->{slot}]->{pid};
967   push @freeslot, $proc{$pid}->{slot};
968   delete $proc{$pid};
969
970   # Load new tasks
971   my $newtask_list = [];
972   my $newtask_results;
973   do {
974     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
975       'where' => {
976         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
977       },
978       'order' => 'qsequence',
979       'offset' => scalar(@$newtask_list),
980     );
981     push(@$newtask_list, @{$newtask_results->{items}});
982   } while (@{$newtask_results->{items}});
983   foreach my $arvados_task (@$newtask_list) {
984     my $jobstep = {
985       'level' => $arvados_task->{'sequence'},
986       'failures' => 0,
987       'arvados_task' => $arvados_task
988     };
989     push @jobstep, $jobstep;
990     push @jobstep_todo, $#jobstep;
991   }
992
993   $progress_is_dirty = 1;
994   1;
995 }
996
997 sub check_refresh_wanted
998 {
999   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1000   if (@stat && $stat[9] > $latest_refresh) {
1001     $latest_refresh = scalar time;
1002     if ($job_has_uuid) {
1003       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1004       for my $attr ('cancelled_at',
1005                     'cancelled_by_user_uuid',
1006                     'cancelled_by_client_uuid') {
1007         $Job->{$attr} = $Job2->{$attr};
1008       }
1009       if ($Job->{'cancelled_at'}) {
1010         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1011              " by user " . $Job->{cancelled_by_user_uuid});
1012         $main::success = 0;
1013         $main::please_freeze = 1;
1014       }
1015     }
1016   }
1017 }
1018
1019 sub check_squeue
1020 {
1021   # return if the kill list was checked <4 seconds ago
1022   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1023   {
1024     return;
1025   }
1026   $squeue_kill_checked = time;
1027
1028   # use killem() on procs whose killtime is reached
1029   for (keys %proc)
1030   {
1031     if (exists $proc{$_}->{killtime}
1032         && $proc{$_}->{killtime} <= time)
1033     {
1034       killem ($_);
1035     }
1036   }
1037
1038   # return if the squeue was checked <60 seconds ago
1039   if (defined $squeue_checked && $squeue_checked > time - 60)
1040   {
1041     return;
1042   }
1043   $squeue_checked = time;
1044
1045   if (!$have_slurm)
1046   {
1047     # here is an opportunity to check for mysterious problems with local procs
1048     return;
1049   }
1050
1051   # get a list of steps still running
1052   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1053   chop @squeue;
1054   if ($squeue[-1] ne "ok")
1055   {
1056     return;
1057   }
1058   pop @squeue;
1059
1060   # which of my jobsteps are running, according to squeue?
1061   my %ok;
1062   foreach (@squeue)
1063   {
1064     if (/^(\d+)\.(\d+) (\S+)/)
1065     {
1066       if ($1 eq $ENV{SLURM_JOBID})
1067       {
1068         $ok{$3} = 1;
1069       }
1070     }
1071   }
1072
1073   # which of my active child procs (>60s old) were not mentioned by squeue?
1074   foreach (keys %proc)
1075   {
1076     if ($proc{$_}->{time} < time - 60
1077         && !exists $ok{$proc{$_}->{jobstepname}}
1078         && !exists $proc{$_}->{killtime})
1079     {
1080       # kill this proc if it hasn't exited in 30 seconds
1081       $proc{$_}->{killtime} = time + 30;
1082     }
1083   }
1084 }
1085
1086
1087 sub release_allocation
1088 {
1089   if ($have_slurm)
1090   {
1091     Log (undef, "release job allocation");
1092     system "scancel $ENV{SLURM_JOBID}";
1093   }
1094 }
1095
1096
1097 sub readfrompipes
1098 {
1099   my $gotsome = 0;
1100   foreach my $job (keys %reader)
1101   {
1102     my $buf;
1103     while (0 < sysread ($reader{$job}, $buf, 8192))
1104     {
1105       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1106       $jobstep[$job]->{stderr} .= $buf;
1107       preprocess_stderr ($job);
1108       if (length ($jobstep[$job]->{stderr}) > 16384)
1109       {
1110         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1111       }
1112       $gotsome = 1;
1113     }
1114   }
1115   return $gotsome;
1116 }
1117
1118
1119 sub preprocess_stderr
1120 {
1121   my $job = shift;
1122
1123   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1124     my $line = $1;
1125     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1126     Log ($job, "stderr $line");
1127     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1128       # whoa.
1129       $main::please_freeze = 1;
1130     }
1131     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1132       $jobstep[$job]->{node_fail} = 1;
1133       ban_node_by_slot($jobstep[$job]->{slotindex});
1134     }
1135   }
1136 }
1137
1138
1139 sub process_stderr
1140 {
1141   my $job = shift;
1142   my $task_success = shift;
1143   preprocess_stderr ($job);
1144
1145   map {
1146     Log ($job, "stderr $_");
1147   } split ("\n", $jobstep[$job]->{stderr});
1148 }
1149
1150 sub fetch_block
1151 {
1152   my $hash = shift;
1153   my ($keep, $child_out, $output_block);
1154
1155   my $cmd = "$arv_cli keep get \Q$hash\E";
1156   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1157   sysread($keep, $output_block, 64 * 1024 * 1024);
1158   close $keep;
1159   return $output_block;
1160 }
1161
1162 sub collate_output
1163 {
1164   Log (undef, "collate");
1165
1166   my ($child_out, $child_in);
1167   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1168   my $joboutput;
1169   for (@jobstep)
1170   {
1171     next if (!exists $_->{'arvados_task'}->{output} ||
1172              !$_->{'arvados_task'}->{'success'} ||
1173              $_->{'exitcode'} != 0);
1174     my $output = $_->{'arvados_task'}->{output};
1175     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1176     {
1177       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1178       print $child_in $output;
1179     }
1180     elsif (@jobstep == 1)
1181     {
1182       $joboutput = $output;
1183       last;
1184     }
1185     elsif (defined (my $outblock = fetch_block ($output)))
1186     {
1187       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1188       print $child_in $outblock;
1189     }
1190     else
1191     {
1192       Log (undef, "XXX fetch_block($output) failed XXX");
1193       $main::success = 0;
1194     }
1195   }
1196   $child_in->close;
1197
1198   if (!defined $joboutput) {
1199     my $s = IO::Select->new($child_out);
1200     if ($s->can_read(120)) {
1201       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1202       chomp($joboutput);
1203     } else {
1204       Log (undef, "timed out reading from 'arv keep put'");
1205     }
1206   }
1207   waitpid($pid, 0);
1208
1209   if ($joboutput)
1210   {
1211     Log (undef, "output $joboutput");
1212     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1213   }
1214   else
1215   {
1216     Log (undef, "output undef");
1217   }
1218   return $joboutput;
1219 }
1220
1221
1222 sub killem
1223 {
1224   foreach (@_)
1225   {
1226     my $sig = 2;                # SIGINT first
1227     if (exists $proc{$_}->{"sent_$sig"} &&
1228         time - $proc{$_}->{"sent_$sig"} > 4)
1229     {
1230       $sig = 15;                # SIGTERM if SIGINT doesn't work
1231     }
1232     if (exists $proc{$_}->{"sent_$sig"} &&
1233         time - $proc{$_}->{"sent_$sig"} > 4)
1234     {
1235       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1236     }
1237     if (!exists $proc{$_}->{"sent_$sig"})
1238     {
1239       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1240       kill $sig, $_;
1241       select (undef, undef, undef, 0.1);
1242       if ($sig == 2)
1243       {
1244         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1245       }
1246       $proc{$_}->{"sent_$sig"} = time;
1247       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1248     }
1249   }
1250 }
1251
1252
1253 sub fhbits
1254 {
1255   my($bits);
1256   for (@_) {
1257     vec($bits,fileno($_),1) = 1;
1258   }
1259   $bits;
1260 }
1261
1262
1263 sub Log                         # ($jobstep_id, $logmessage)
1264 {
1265   if ($_[1] =~ /\n/) {
1266     for my $line (split (/\n/, $_[1])) {
1267       Log ($_[0], $line);
1268     }
1269     return;
1270   }
1271   my $fh = select STDERR; $|=1; select $fh;
1272   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1273   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1274   $message .= "\n";
1275   my $datetime;
1276   if ($local_logfile || -t STDERR) {
1277     my @gmtime = gmtime;
1278     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1279                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1280   }
1281   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1282
1283   if ($local_logfile) {
1284     print $local_logfile $datetime . " " . $message;
1285   }
1286 }
1287
1288
1289 sub croak
1290 {
1291   my ($package, $file, $line) = caller;
1292   my $message = "@_ at $file line $line\n";
1293   Log (undef, $message);
1294   freeze() if @jobstep_todo;
1295   collate_output() if @jobstep_todo;
1296   cleanup();
1297   save_meta() if $local_logfile;
1298   die;
1299 }
1300
1301
1302 sub cleanup
1303 {
1304   return if !$job_has_uuid;
1305   $Job->update_attributes('running' => 0,
1306                           'success' => 0,
1307                           'finished_at' => scalar gmtime);
1308 }
1309
1310
1311 sub save_meta
1312 {
1313   my $justcheckpoint = shift; # false if this will be the last meta saved
1314   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1315
1316   $local_logfile->flush;
1317   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1318       . quotemeta($local_logfile->filename);
1319   my $loglocator = `$cmd`;
1320   die "system $cmd failed: $?" if $?;
1321   chomp($loglocator);
1322
1323   $local_logfile = undef;   # the temp file is automatically deleted
1324   Log (undef, "log manifest is $loglocator");
1325   $Job->{'log'} = $loglocator;
1326   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1327 }
1328
1329
1330 sub freeze_if_want_freeze
1331 {
1332   if ($main::please_freeze)
1333   {
1334     release_allocation();
1335     if (@_)
1336     {
1337       # kill some srun procs before freeze+stop
1338       map { $proc{$_} = {} } @_;
1339       while (%proc)
1340       {
1341         killem (keys %proc);
1342         select (undef, undef, undef, 0.1);
1343         my $died;
1344         while (($died = waitpid (-1, WNOHANG)) > 0)
1345         {
1346           delete $proc{$died};
1347         }
1348       }
1349     }
1350     freeze();
1351     collate_output();
1352     cleanup();
1353     save_meta();
1354     exit 0;
1355   }
1356 }
1357
1358
1359 sub freeze
1360 {
1361   Log (undef, "Freeze not implemented");
1362   return;
1363 }
1364
1365
1366 sub thaw
1367 {
1368   croak ("Thaw not implemented");
1369 }
1370
1371
1372 sub freezequote
1373 {
1374   my $s = shift;
1375   $s =~ s/\\/\\\\/g;
1376   $s =~ s/\n/\\n/g;
1377   return $s;
1378 }
1379
1380
1381 sub freezeunquote
1382 {
1383   my $s = shift;
1384   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1385   return $s;
1386 }
1387
1388
1389 sub srun
1390 {
1391   my $srunargs = shift;
1392   my $execargs = shift;
1393   my $opts = shift || {};
1394   my $stdin = shift;
1395   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1396   print STDERR (join (" ",
1397                       map { / / ? "'$_'" : $_ }
1398                       (@$args)),
1399                 "\n")
1400       if $ENV{CRUNCH_DEBUG};
1401
1402   if (defined $stdin) {
1403     my $child = open STDIN, "-|";
1404     defined $child or die "no fork: $!";
1405     if ($child == 0) {
1406       print $stdin or die $!;
1407       close STDOUT or die $!;
1408       exit 0;
1409     }
1410   }
1411
1412   return system (@$args) if $opts->{fork};
1413
1414   exec @$args;
1415   warn "ENV size is ".length(join(" ",%ENV));
1416   die "exec failed: $!: @$args";
1417 }
1418
1419
1420 sub ban_node_by_slot {
1421   # Don't start any new jobsteps on this node for 60 seconds
1422   my $slotid = shift;
1423   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1424   $slot[$slotid]->{node}->{hold_count}++;
1425   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1426 }
1427
1428 sub must_lock_now
1429 {
1430   my ($lockfile, $error_message) = @_;
1431   open L, ">", $lockfile or croak("$lockfile: $!");
1432   if (!flock L, LOCK_EX|LOCK_NB) {
1433     croak("Can't lock $lockfile: $error_message\n");
1434   }
1435 }
1436
1437 __DATA__
1438 #!/usr/bin/perl
1439
1440 # checkout-and-build
1441
1442 use Fcntl ':flock';
1443
1444 my $destdir = $ENV{"CRUNCH_SRC"};
1445 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1446 my $repo = $ENV{"CRUNCH_SRC_URL"};
1447
1448 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1449 flock L, LOCK_EX;
1450 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1451     exit 0;
1452 }
1453
1454 unlink "$destdir.commit";
1455 open STDOUT, ">", "$destdir.log";
1456 open STDERR, ">&STDOUT";
1457
1458 mkdir $destdir;
1459 my @git_archive_data = <DATA>;
1460 if (@git_archive_data) {
1461   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1462   print TARX @git_archive_data;
1463   if(!close(TARX)) {
1464     die "'tar -C $destdir -xf -' exited $?: $!";
1465   }
1466 }
1467
1468 my $pwd;
1469 chomp ($pwd = `pwd`);
1470 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1471 mkdir $install_dir;
1472
1473 for my $src_path ("$destdir/arvados/sdk/python") {
1474   if (-d $src_path) {
1475     shell_or_die ("virtualenv", $install_dir);
1476     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1477   }
1478 }
1479
1480 if (-e "$destdir/crunch_scripts/install") {
1481     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1482 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1483     # Old version
1484     shell_or_die ("./tests/autotests.sh", $install_dir);
1485 } elsif (-e "./install.sh") {
1486     shell_or_die ("./install.sh", $install_dir);
1487 }
1488
1489 if ($commit) {
1490     unlink "$destdir.commit.new";
1491     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1492     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1493 }
1494
1495 close L;
1496
1497 exit 0;
1498
1499 sub shell_or_die
1500 {
1501   if ($ENV{"DEBUG"}) {
1502     print STDERR "@_\n";
1503   }
1504   system (@_) == 0
1505       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1506 }
1507
1508 __DATA__