Expanded search path for cgroup stats, changed command line interface a bit.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 if ($docker_image) {
506   my $docker_pid = fork();
507   if ($docker_pid == 0)
508   {
509     srun (["srun", "--nodelist=" . join(' ', @node)],
510           [$docker_bin, 'pull', $docker_image]);
511     exit ($?);
512   }
513   while (1)
514   {
515     last if $docker_pid == waitpid (-1, WNOHANG);
516     freeze_if_want_freeze ($docker_pid);
517     select (undef, undef, undef, 0.1);
518   }
519   # If the Docker image was specified as a hash, pull will fail.
520   # Ignore that error.  We'll see what happens when we try to run later.
521   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
522   {
523     croak("Installing Docker image $docker_image returned exit code $?");
524   }
525 }
526
527 foreach (qw (script script_version script_parameters runtime_constraints))
528 {
529   Log (undef,
530        "$_ " .
531        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 }
533 foreach (split (/\n/, $Job->{knobs}))
534 {
535   Log (undef, "knob " . $_);
536 }
537
538
539
540 $main::success = undef;
541
542
543
544 ONELEVEL:
545
546 my $thisround_succeeded = 0;
547 my $thisround_failed = 0;
548 my $thisround_failed_multiple = 0;
549
550 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
551                        or $a <=> $b } @jobstep_todo;
552 my $level = $jobstep[$jobstep_todo[0]]->{level};
553 Log (undef, "start level $level");
554
555
556
557 my %proc;
558 my @freeslot = (0..$#slot);
559 my @holdslot;
560 my %reader;
561 my $progress_is_dirty = 1;
562 my $progress_stats_updated = 0;
563
564 update_progress_stats();
565
566
567
568 THISROUND:
569 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 {
571   my $id = $jobstep_todo[$todo_ptr];
572   my $Jobstep = $jobstep[$id];
573   if ($Jobstep->{level} != $level)
574   {
575     next;
576   }
577
578   pipe $reader{$id}, "writer" or croak ($!);
579   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
580   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581
582   my $childslot = $freeslot[0];
583   my $childnode = $slot[$childslot]->{node};
584   my $childslotname = join (".",
585                             $slot[$childslot]->{node}->{name},
586                             $slot[$childslot]->{cpu});
587   my $childpid = fork();
588   if ($childpid == 0)
589   {
590     $SIG{'INT'} = 'DEFAULT';
591     $SIG{'QUIT'} = 'DEFAULT';
592     $SIG{'TERM'} = 'DEFAULT';
593
594     foreach (values (%reader))
595     {
596       close($_);
597     }
598     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
599     open(STDOUT,">&writer");
600     open(STDERR,">&writer");
601
602     undef $dbh;
603     undef $sth;
604
605     delete $ENV{"GNUPGHOME"};
606     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
607     $ENV{"TASK_QSEQUENCE"} = $id;
608     $ENV{"TASK_SEQUENCE"} = $level;
609     $ENV{"JOB_SCRIPT"} = $Job->{script};
610     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
611       $param =~ tr/a-z/A-Z/;
612       $ENV{"JOB_PARAMETER_$param"} = $value;
613     }
614     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
615     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
616     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_image)
643     {
644       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
646       # Dynamically configure the container to use the host system as its
647       # DNS server.  Get the host's global addresses from the ip command,
648       # and turn them into docker --dns options using gawk.
649       $command .=
650           q{$(ip -o address show scope global |
651               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
653       {
654         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
655       }
656       while (my ($env_key, $env_val) = each %ENV)
657       {
658         if ($env_key =~ /^(JOB|TASK)_/) {
659           $command .= "-e \Q$env_key=$env_val\E ";
660         }
661       }
662       $command .= "\Q$docker_image\E ";
663     } else {
664       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
665     }
666     $command .= "stdbuf -o0 -e0 ";
667     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
668     my @execargs = ('bash', '-c', $command);
669     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
670     exit (111);
671   }
672   close("writer");
673   if (!defined $childpid)
674   {
675     close $reader{$id};
676     delete $reader{$id};
677     next;
678   }
679   shift @freeslot;
680   $proc{$childpid} = { jobstep => $id,
681                        time => time,
682                        slot => $childslot,
683                        jobstepname => "$job_id.$id.$childpid",
684                      };
685   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
686   $slot[$childslot]->{pid} = $childpid;
687
688   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
689   Log ($id, "child $childpid started on $childslotname");
690   $Jobstep->{starttime} = time;
691   $Jobstep->{node} = $childnode->{name};
692   $Jobstep->{slotindex} = $childslot;
693   delete $Jobstep->{stderr};
694   delete $Jobstep->{finishtime};
695
696   splice @jobstep_todo, $todo_ptr, 1;
697   --$todo_ptr;
698
699   $progress_is_dirty = 1;
700
701   while (!@freeslot
702          ||
703          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
704   {
705     last THISROUND if $main::please_freeze;
706     if ($main::please_info)
707     {
708       $main::please_info = 0;
709       freeze();
710       collate_output();
711       save_meta(1);
712       update_progress_stats();
713     }
714     my $gotsome
715         = readfrompipes ()
716         + reapchildren ();
717     if (!$gotsome)
718     {
719       check_refresh_wanted();
720       check_squeue();
721       update_progress_stats();
722       select (undef, undef, undef, 0.1);
723     }
724     elsif (time - $progress_stats_updated >= 30)
725     {
726       update_progress_stats();
727     }
728     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
729         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
730     {
731       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
732           .($thisround_failed+$thisround_succeeded)
733           .") -- giving up on this round";
734       Log (undef, $message);
735       last THISROUND;
736     }
737
738     # move slots from freeslot to holdslot (or back to freeslot) if necessary
739     for (my $i=$#freeslot; $i>=0; $i--) {
740       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
741         push @holdslot, (splice @freeslot, $i, 1);
742       }
743     }
744     for (my $i=$#holdslot; $i>=0; $i--) {
745       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
746         push @freeslot, (splice @holdslot, $i, 1);
747       }
748     }
749
750     # give up if no nodes are succeeding
751     if (!grep { $_->{node}->{losing_streak} == 0 &&
752                     $_->{node}->{hold_count} < 4 } @slot) {
753       my $message = "Every node has failed -- giving up on this round";
754       Log (undef, $message);
755       last THISROUND;
756     }
757   }
758 }
759
760
761 push @freeslot, splice @holdslot;
762 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
763
764
765 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
766 while (%proc)
767 {
768   if ($main::please_continue) {
769     $main::please_continue = 0;
770     goto THISROUND;
771   }
772   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
773   readfrompipes ();
774   if (!reapchildren())
775   {
776     check_refresh_wanted();
777     check_squeue();
778     update_progress_stats();
779     select (undef, undef, undef, 0.1);
780     killem (keys %proc) if $main::please_freeze;
781   }
782 }
783
784 update_progress_stats();
785 freeze_if_want_freeze();
786
787
788 if (!defined $main::success)
789 {
790   if (@jobstep_todo &&
791       $thisround_succeeded == 0 &&
792       ($thisround_failed == 0 || $thisround_failed > 4))
793   {
794     my $message = "stop because $thisround_failed tasks failed and none succeeded";
795     Log (undef, $message);
796     $main::success = 0;
797   }
798   if (!@jobstep_todo)
799   {
800     $main::success = 1;
801   }
802 }
803
804 goto ONELEVEL if !defined $main::success;
805
806
807 release_allocation();
808 freeze();
809 my $collated_output = &collate_output();
810
811 if ($job_has_uuid) {
812   $Job->update_attributes('running' => 0,
813                           'success' => $collated_output && $main::success,
814                           'finished_at' => scalar gmtime)
815 }
816
817 if ($collated_output)
818 {
819   eval {
820     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
821         or die "failed to get collated manifest: $!";
822     # Read the original manifest, and strip permission hints from it,
823     # so we can put the result in a Collection.
824     my @manifest_lines = ();
825     while (my $manifest_line = <$orig_manifest>) {
826       my @words = split(/ /, $manifest_line, -1);
827       foreach my $ii (0..$#words) {
828         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
829           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
830         }
831       }
832       push(@manifest_lines, join(" ", @words));
833     }
834     my $manifest_text = join("", @manifest_lines);
835     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
836       'uuid' => md5_hex($manifest_text),
837       'manifest_text' => $manifest_text,
838     });
839     $Job->update_attributes('output' => $output->{uuid});
840     if ($Job->{'output_is_persistent'}) {
841       $arv->{'links'}->{'create'}->execute('link' => {
842         'tail_kind' => 'arvados#user',
843         'tail_uuid' => $User->{'uuid'},
844         'head_kind' => 'arvados#collection',
845         'head_uuid' => $Job->{'output'},
846         'link_class' => 'resources',
847         'name' => 'wants',
848       });
849     }
850   };
851   if ($@) {
852     Log (undef, "Failed to register output manifest: $@");
853   }
854 }
855
856 Log (undef, "finish");
857
858 save_meta();
859 exit 0;
860
861
862
863 sub update_progress_stats
864 {
865   $progress_stats_updated = time;
866   return if !$progress_is_dirty;
867   my ($todo, $done, $running) = (scalar @jobstep_todo,
868                                  scalar @jobstep_done,
869                                  scalar @slot - scalar @freeslot - scalar @holdslot);
870   $Job->{'tasks_summary'} ||= {};
871   $Job->{'tasks_summary'}->{'todo'} = $todo;
872   $Job->{'tasks_summary'}->{'done'} = $done;
873   $Job->{'tasks_summary'}->{'running'} = $running;
874   if ($job_has_uuid) {
875     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
876   }
877   Log (undef, "status: $done done, $running running, $todo todo");
878   $progress_is_dirty = 0;
879 }
880
881
882
883 sub reapchildren
884 {
885   my $pid = waitpid (-1, WNOHANG);
886   return 0 if $pid <= 0;
887
888   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
889                   . "."
890                   . $slot[$proc{$pid}->{slot}]->{cpu});
891   my $jobstepid = $proc{$pid}->{jobstep};
892   my $elapsed = time - $proc{$pid}->{time};
893   my $Jobstep = $jobstep[$jobstepid];
894
895   my $childstatus = $?;
896   my $exitvalue = $childstatus >> 8;
897   my $exitinfo = sprintf("exit %d signal %d%s",
898                          $exitvalue,
899                          $childstatus & 127,
900                          ($childstatus & 128 ? ' core dump' : ''));
901   $Jobstep->{'arvados_task'}->reload;
902   my $task_success = $Jobstep->{'arvados_task'}->{success};
903
904   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
905
906   if (!defined $task_success) {
907     # task did not indicate one way or the other --> fail
908     $Jobstep->{'arvados_task'}->{success} = 0;
909     $Jobstep->{'arvados_task'}->save;
910     $task_success = 0;
911   }
912
913   if (!$task_success)
914   {
915     my $temporary_fail;
916     $temporary_fail ||= $Jobstep->{node_fail};
917     $temporary_fail ||= ($exitvalue == 111);
918
919     ++$thisround_failed;
920     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
921
922     # Check for signs of a failed or misconfigured node
923     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
924         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
925       # Don't count this against jobstep failure thresholds if this
926       # node is already suspected faulty and srun exited quickly
927       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
928           $elapsed < 5) {
929         Log ($jobstepid, "blaming failure on suspect node " .
930              $slot[$proc{$pid}->{slot}]->{node}->{name});
931         $temporary_fail ||= 1;
932       }
933       ban_node_by_slot($proc{$pid}->{slot});
934     }
935
936     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
937                              ++$Jobstep->{'failures'},
938                              $temporary_fail ? 'temporary ' : 'permanent',
939                              $elapsed));
940
941     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
942       # Give up on this task, and the whole job
943       $main::success = 0;
944       $main::please_freeze = 1;
945     }
946     else {
947       # Put this task back on the todo queue
948       push @jobstep_todo, $jobstepid;
949     }
950     $Job->{'tasks_summary'}->{'failed'}++;
951   }
952   else
953   {
954     ++$thisround_succeeded;
955     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
956     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
957     push @jobstep_done, $jobstepid;
958     Log ($jobstepid, "success in $elapsed seconds");
959   }
960   $Jobstep->{exitcode} = $childstatus;
961   $Jobstep->{finishtime} = time;
962   process_stderr ($jobstepid, $task_success);
963   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
964
965   close $reader{$jobstepid};
966   delete $reader{$jobstepid};
967   delete $slot[$proc{$pid}->{slot}]->{pid};
968   push @freeslot, $proc{$pid}->{slot};
969   delete $proc{$pid};
970
971   # Load new tasks
972   my $newtask_list = [];
973   my $newtask_results;
974   do {
975     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
976       'where' => {
977         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
978       },
979       'order' => 'qsequence',
980       'offset' => scalar(@$newtask_list),
981     );
982     push(@$newtask_list, @{$newtask_results->{items}});
983   } while (@{$newtask_results->{items}});
984   foreach my $arvados_task (@$newtask_list) {
985     my $jobstep = {
986       'level' => $arvados_task->{'sequence'},
987       'failures' => 0,
988       'arvados_task' => $arvados_task
989     };
990     push @jobstep, $jobstep;
991     push @jobstep_todo, $#jobstep;
992   }
993
994   $progress_is_dirty = 1;
995   1;
996 }
997
998 sub check_refresh_wanted
999 {
1000   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1001   if (@stat && $stat[9] > $latest_refresh) {
1002     $latest_refresh = scalar time;
1003     if ($job_has_uuid) {
1004       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1005       for my $attr ('cancelled_at',
1006                     'cancelled_by_user_uuid',
1007                     'cancelled_by_client_uuid') {
1008         $Job->{$attr} = $Job2->{$attr};
1009       }
1010       if ($Job->{'cancelled_at'}) {
1011         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1012              " by user " . $Job->{cancelled_by_user_uuid});
1013         $main::success = 0;
1014         $main::please_freeze = 1;
1015       }
1016     }
1017   }
1018 }
1019
1020 sub check_squeue
1021 {
1022   # return if the kill list was checked <4 seconds ago
1023   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1024   {
1025     return;
1026   }
1027   $squeue_kill_checked = time;
1028
1029   # use killem() on procs whose killtime is reached
1030   for (keys %proc)
1031   {
1032     if (exists $proc{$_}->{killtime}
1033         && $proc{$_}->{killtime} <= time)
1034     {
1035       killem ($_);
1036     }
1037   }
1038
1039   # return if the squeue was checked <60 seconds ago
1040   if (defined $squeue_checked && $squeue_checked > time - 60)
1041   {
1042     return;
1043   }
1044   $squeue_checked = time;
1045
1046   if (!$have_slurm)
1047   {
1048     # here is an opportunity to check for mysterious problems with local procs
1049     return;
1050   }
1051
1052   # get a list of steps still running
1053   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1054   chop @squeue;
1055   if ($squeue[-1] ne "ok")
1056   {
1057     return;
1058   }
1059   pop @squeue;
1060
1061   # which of my jobsteps are running, according to squeue?
1062   my %ok;
1063   foreach (@squeue)
1064   {
1065     if (/^(\d+)\.(\d+) (\S+)/)
1066     {
1067       if ($1 eq $ENV{SLURM_JOBID})
1068       {
1069         $ok{$3} = 1;
1070       }
1071     }
1072   }
1073
1074   # which of my active child procs (>60s old) were not mentioned by squeue?
1075   foreach (keys %proc)
1076   {
1077     if ($proc{$_}->{time} < time - 60
1078         && !exists $ok{$proc{$_}->{jobstepname}}
1079         && !exists $proc{$_}->{killtime})
1080     {
1081       # kill this proc if it hasn't exited in 30 seconds
1082       $proc{$_}->{killtime} = time + 30;
1083     }
1084   }
1085 }
1086
1087
1088 sub release_allocation
1089 {
1090   if ($have_slurm)
1091   {
1092     Log (undef, "release job allocation");
1093     system "scancel $ENV{SLURM_JOBID}";
1094   }
1095 }
1096
1097
1098 sub readfrompipes
1099 {
1100   my $gotsome = 0;
1101   foreach my $job (keys %reader)
1102   {
1103     my $buf;
1104     while (0 < sysread ($reader{$job}, $buf, 8192))
1105     {
1106       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1107       $jobstep[$job]->{stderr} .= $buf;
1108       preprocess_stderr ($job);
1109       if (length ($jobstep[$job]->{stderr}) > 16384)
1110       {
1111         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1112       }
1113       $gotsome = 1;
1114     }
1115   }
1116   return $gotsome;
1117 }
1118
1119
1120 sub preprocess_stderr
1121 {
1122   my $job = shift;
1123
1124   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1125     my $line = $1;
1126     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1127     Log ($job, "stderr $line");
1128     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1129       # whoa.
1130       $main::please_freeze = 1;
1131     }
1132     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1133       $jobstep[$job]->{node_fail} = 1;
1134       ban_node_by_slot($jobstep[$job]->{slotindex});
1135     }
1136   }
1137 }
1138
1139
1140 sub process_stderr
1141 {
1142   my $job = shift;
1143   my $task_success = shift;
1144   preprocess_stderr ($job);
1145
1146   map {
1147     Log ($job, "stderr $_");
1148   } split ("\n", $jobstep[$job]->{stderr});
1149 }
1150
1151 sub fetch_block
1152 {
1153   my $hash = shift;
1154   my ($keep, $child_out, $output_block);
1155
1156   my $cmd = "$arv_cli keep get \Q$hash\E";
1157   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1158   sysread($keep, $output_block, 64 * 1024 * 1024);
1159   close $keep;
1160   return $output_block;
1161 }
1162
1163 sub collate_output
1164 {
1165   Log (undef, "collate");
1166
1167   my ($child_out, $child_in);
1168   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1169   my $joboutput;
1170   for (@jobstep)
1171   {
1172     next if (!exists $_->{'arvados_task'}->{output} ||
1173              !$_->{'arvados_task'}->{'success'} ||
1174              $_->{'exitcode'} != 0);
1175     my $output = $_->{'arvados_task'}->{output};
1176     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1177     {
1178       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1179       print $child_in $output;
1180     }
1181     elsif (@jobstep == 1)
1182     {
1183       $joboutput = $output;
1184       last;
1185     }
1186     elsif (defined (my $outblock = fetch_block ($output)))
1187     {
1188       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1189       print $child_in $outblock;
1190     }
1191     else
1192     {
1193       Log (undef, "XXX fetch_block($output) failed XXX");
1194       $main::success = 0;
1195     }
1196   }
1197   $child_in->close;
1198
1199   if (!defined $joboutput) {
1200     my $s = IO::Select->new($child_out);
1201     if ($s->can_read(120)) {
1202       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1203       chomp($joboutput);
1204     } else {
1205       Log (undef, "timed out reading from 'arv keep put'");
1206     }
1207   }
1208   waitpid($pid, 0);
1209
1210   if ($joboutput)
1211   {
1212     Log (undef, "output $joboutput");
1213     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1214   }
1215   else
1216   {
1217     Log (undef, "output undef");
1218   }
1219   return $joboutput;
1220 }
1221
1222
1223 sub killem
1224 {
1225   foreach (@_)
1226   {
1227     my $sig = 2;                # SIGINT first
1228     if (exists $proc{$_}->{"sent_$sig"} &&
1229         time - $proc{$_}->{"sent_$sig"} > 4)
1230     {
1231       $sig = 15;                # SIGTERM if SIGINT doesn't work
1232     }
1233     if (exists $proc{$_}->{"sent_$sig"} &&
1234         time - $proc{$_}->{"sent_$sig"} > 4)
1235     {
1236       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1237     }
1238     if (!exists $proc{$_}->{"sent_$sig"})
1239     {
1240       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1241       kill $sig, $_;
1242       select (undef, undef, undef, 0.1);
1243       if ($sig == 2)
1244       {
1245         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1246       }
1247       $proc{$_}->{"sent_$sig"} = time;
1248       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1249     }
1250   }
1251 }
1252
1253
1254 sub fhbits
1255 {
1256   my($bits);
1257   for (@_) {
1258     vec($bits,fileno($_),1) = 1;
1259   }
1260   $bits;
1261 }
1262
1263
1264 sub Log                         # ($jobstep_id, $logmessage)
1265 {
1266   if ($_[1] =~ /\n/) {
1267     for my $line (split (/\n/, $_[1])) {
1268       Log ($_[0], $line);
1269     }
1270     return;
1271   }
1272   my $fh = select STDERR; $|=1; select $fh;
1273   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1274   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1275   $message .= "\n";
1276   my $datetime;
1277   if ($local_logfile || -t STDERR) {
1278     my @gmtime = gmtime;
1279     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1280                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1281   }
1282   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1283
1284   if ($local_logfile) {
1285     print $local_logfile $datetime . " " . $message;
1286   }
1287 }
1288
1289
1290 sub croak
1291 {
1292   my ($package, $file, $line) = caller;
1293   my $message = "@_ at $file line $line\n";
1294   Log (undef, $message);
1295   freeze() if @jobstep_todo;
1296   collate_output() if @jobstep_todo;
1297   cleanup();
1298   save_meta() if $local_logfile;
1299   die;
1300 }
1301
1302
1303 sub cleanup
1304 {
1305   return if !$job_has_uuid;
1306   $Job->update_attributes('running' => 0,
1307                           'success' => 0,
1308                           'finished_at' => scalar gmtime);
1309 }
1310
1311
1312 sub save_meta
1313 {
1314   my $justcheckpoint = shift; # false if this will be the last meta saved
1315   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1316
1317   $local_logfile->flush;
1318   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1319       . quotemeta($local_logfile->filename);
1320   my $loglocator = `$cmd`;
1321   die "system $cmd failed: $?" if $?;
1322   chomp($loglocator);
1323
1324   $local_logfile = undef;   # the temp file is automatically deleted
1325   Log (undef, "log manifest is $loglocator");
1326   $Job->{'log'} = $loglocator;
1327   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1328 }
1329
1330
1331 sub freeze_if_want_freeze
1332 {
1333   if ($main::please_freeze)
1334   {
1335     release_allocation();
1336     if (@_)
1337     {
1338       # kill some srun procs before freeze+stop
1339       map { $proc{$_} = {} } @_;
1340       while (%proc)
1341       {
1342         killem (keys %proc);
1343         select (undef, undef, undef, 0.1);
1344         my $died;
1345         while (($died = waitpid (-1, WNOHANG)) > 0)
1346         {
1347           delete $proc{$died};
1348         }
1349       }
1350     }
1351     freeze();
1352     collate_output();
1353     cleanup();
1354     save_meta();
1355     exit 0;
1356   }
1357 }
1358
1359
1360 sub freeze
1361 {
1362   Log (undef, "Freeze not implemented");
1363   return;
1364 }
1365
1366
1367 sub thaw
1368 {
1369   croak ("Thaw not implemented");
1370 }
1371
1372
1373 sub freezequote
1374 {
1375   my $s = shift;
1376   $s =~ s/\\/\\\\/g;
1377   $s =~ s/\n/\\n/g;
1378   return $s;
1379 }
1380
1381
1382 sub freezeunquote
1383 {
1384   my $s = shift;
1385   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1386   return $s;
1387 }
1388
1389
1390 sub srun
1391 {
1392   my $srunargs = shift;
1393   my $execargs = shift;
1394   my $opts = shift || {};
1395   my $stdin = shift;
1396   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1397   print STDERR (join (" ",
1398                       map { / / ? "'$_'" : $_ }
1399                       (@$args)),
1400                 "\n")
1401       if $ENV{CRUNCH_DEBUG};
1402
1403   if (defined $stdin) {
1404     my $child = open STDIN, "-|";
1405     defined $child or die "no fork: $!";
1406     if ($child == 0) {
1407       print $stdin or die $!;
1408       close STDOUT or die $!;
1409       exit 0;
1410     }
1411   }
1412
1413   return system (@$args) if $opts->{fork};
1414
1415   exec @$args;
1416   warn "ENV size is ".length(join(" ",%ENV));
1417   die "exec failed: $!: @$args";
1418 }
1419
1420
1421 sub ban_node_by_slot {
1422   # Don't start any new jobsteps on this node for 60 seconds
1423   my $slotid = shift;
1424   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1425   $slot[$slotid]->{node}->{hold_count}++;
1426   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1427 }
1428
1429 sub must_lock_now
1430 {
1431   my ($lockfile, $error_message) = @_;
1432   open L, ">", $lockfile or croak("$lockfile: $!");
1433   if (!flock L, LOCK_EX|LOCK_NB) {
1434     croak("Can't lock $lockfile: $error_message\n");
1435   }
1436 }
1437
1438 __DATA__
1439 #!/usr/bin/perl
1440
1441 # checkout-and-build
1442
1443 use Fcntl ':flock';
1444
1445 my $destdir = $ENV{"CRUNCH_SRC"};
1446 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1447 my $repo = $ENV{"CRUNCH_SRC_URL"};
1448
1449 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1450 flock L, LOCK_EX;
1451 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1452     exit 0;
1453 }
1454
1455 unlink "$destdir.commit";
1456 open STDOUT, ">", "$destdir.log";
1457 open STDERR, ">&STDOUT";
1458
1459 mkdir $destdir;
1460 my @git_archive_data = <DATA>;
1461 if (@git_archive_data) {
1462   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1463   print TARX @git_archive_data;
1464   if(!close(TARX)) {
1465     die "'tar -C $destdir -xf -' exited $?: $!";
1466   }
1467 }
1468
1469 my $pwd;
1470 chomp ($pwd = `pwd`);
1471 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1472 mkdir $install_dir;
1473
1474 for my $src_path ("$destdir/arvados/sdk/python") {
1475   if (-d $src_path) {
1476     shell_or_die ("virtualenv", $install_dir);
1477     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1478   }
1479 }
1480
1481 if (-e "$destdir/crunch_scripts/install") {
1482     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1483 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1484     # Old version
1485     shell_or_die ("./tests/autotests.sh", $install_dir);
1486 } elsif (-e "./install.sh") {
1487     shell_or_die ("./install.sh", $install_dir);
1488 }
1489
1490 if ($commit) {
1491     unlink "$destdir.commit.new";
1492     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1493     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1494 }
1495
1496 close L;
1497
1498 exit 0;
1499
1500 sub shell_or_die
1501 {
1502   if ($ENV{"DEBUG"}) {
1503     print STDERR "@_\n";
1504   }
1505   system (@_) == 0
1506       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1507 }
1508
1509 __DATA__