Merge branch 'master' into 2895-no-more-redis
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 if ($docker_image) {
506   my $docker_pid = fork();
507   if ($docker_pid == 0)
508   {
509     srun (["srun", "--nodelist=" . join(' ', @node)],
510           [$docker_bin, 'pull', $docker_image]);
511     exit ($?);
512   }
513   while (1)
514   {
515     last if $docker_pid == waitpid (-1, WNOHANG);
516     freeze_if_want_freeze ($docker_pid);
517     select (undef, undef, undef, 0.1);
518   }
519   # If the Docker image was specified as a hash, pull will fail.
520   # Ignore that error.  We'll see what happens when we try to run later.
521   if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
522   {
523     croak("Installing Docker image $docker_image returned exit code $?");
524   }
525 }
526
527 foreach (qw (script script_version script_parameters runtime_constraints))
528 {
529   Log (undef,
530        "$_ " .
531        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 }
533 foreach (split (/\n/, $Job->{knobs}))
534 {
535   Log (undef, "knob " . $_);
536 }
537
538
539
540 $main::success = undef;
541
542
543
544 ONELEVEL:
545
546 my $thisround_succeeded = 0;
547 my $thisround_failed = 0;
548 my $thisround_failed_multiple = 0;
549
550 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
551                        or $a <=> $b } @jobstep_todo;
552 my $level = $jobstep[$jobstep_todo[0]]->{level};
553 Log (undef, "start level $level");
554
555
556
557 my %proc;
558 my @freeslot = (0..$#slot);
559 my @holdslot;
560 my %reader;
561 my $progress_is_dirty = 1;
562 my $progress_stats_updated = 0;
563
564 update_progress_stats();
565
566
567
568 THISROUND:
569 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 {
571   my $id = $jobstep_todo[$todo_ptr];
572   my $Jobstep = $jobstep[$id];
573   if ($Jobstep->{level} != $level)
574   {
575     next;
576   }
577
578   pipe $reader{$id}, "writer" or croak ($!);
579   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
580   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581
582   my $childslot = $freeslot[0];
583   my $childnode = $slot[$childslot]->{node};
584   my $childslotname = join (".",
585                             $slot[$childslot]->{node}->{name},
586                             $slot[$childslot]->{cpu});
587   my $childpid = fork();
588   if ($childpid == 0)
589   {
590     $SIG{'INT'} = 'DEFAULT';
591     $SIG{'QUIT'} = 'DEFAULT';
592     $SIG{'TERM'} = 'DEFAULT';
593
594     foreach (values (%reader))
595     {
596       close($_);
597     }
598     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
599     open(STDOUT,">&writer");
600     open(STDERR,">&writer");
601
602     undef $dbh;
603     undef $sth;
604
605     delete $ENV{"GNUPGHOME"};
606     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
607     $ENV{"TASK_QSEQUENCE"} = $id;
608     $ENV{"TASK_SEQUENCE"} = $level;
609     $ENV{"JOB_SCRIPT"} = $Job->{script};
610     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
611       $param =~ tr/a-z/A-Z/;
612       $ENV{"JOB_PARAMETER_$param"} = $value;
613     }
614     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
615     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
616     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_image)
643     {
644       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
645       # Dynamically configure the container to use the host system as its
646       # DNS server.  Get the host's global addresses from the ip command,
647       # and turn them into docker --dns options using gawk.
648       $command .=
649           q{$(ip -o address show scope global |
650               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
651       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
652       {
653         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
654       }
655       while (my ($env_key, $env_val) = each %ENV)
656       {
657         $command .= "-e \Q$env_key=$env_val\E ";
658       }
659       $command .= "\Q$docker_image\E ";
660     }
661     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
662     my @execargs = ('bash', '-c', $command);
663     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
664     exit (111);
665   }
666   close("writer");
667   if (!defined $childpid)
668   {
669     close $reader{$id};
670     delete $reader{$id};
671     next;
672   }
673   shift @freeslot;
674   $proc{$childpid} = { jobstep => $id,
675                        time => time,
676                        slot => $childslot,
677                        jobstepname => "$job_id.$id.$childpid",
678                      };
679   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
680   $slot[$childslot]->{pid} = $childpid;
681
682   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
683   Log ($id, "child $childpid started on $childslotname");
684   $Jobstep->{starttime} = time;
685   $Jobstep->{node} = $childnode->{name};
686   $Jobstep->{slotindex} = $childslot;
687   delete $Jobstep->{stderr};
688   delete $Jobstep->{finishtime};
689
690   splice @jobstep_todo, $todo_ptr, 1;
691   --$todo_ptr;
692
693   $progress_is_dirty = 1;
694
695   while (!@freeslot
696          ||
697          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
698   {
699     last THISROUND if $main::please_freeze;
700     if ($main::please_info)
701     {
702       $main::please_info = 0;
703       freeze();
704       collate_output();
705       save_meta(1);
706       update_progress_stats();
707     }
708     my $gotsome
709         = readfrompipes ()
710         + reapchildren ();
711     if (!$gotsome)
712     {
713       check_refresh_wanted();
714       check_squeue();
715       update_progress_stats();
716       select (undef, undef, undef, 0.1);
717     }
718     elsif (time - $progress_stats_updated >= 30)
719     {
720       update_progress_stats();
721     }
722     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
723         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
724     {
725       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
726           .($thisround_failed+$thisround_succeeded)
727           .") -- giving up on this round";
728       Log (undef, $message);
729       last THISROUND;
730     }
731
732     # move slots from freeslot to holdslot (or back to freeslot) if necessary
733     for (my $i=$#freeslot; $i>=0; $i--) {
734       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
735         push @holdslot, (splice @freeslot, $i, 1);
736       }
737     }
738     for (my $i=$#holdslot; $i>=0; $i--) {
739       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
740         push @freeslot, (splice @holdslot, $i, 1);
741       }
742     }
743
744     # give up if no nodes are succeeding
745     if (!grep { $_->{node}->{losing_streak} == 0 &&
746                     $_->{node}->{hold_count} < 4 } @slot) {
747       my $message = "Every node has failed -- giving up on this round";
748       Log (undef, $message);
749       last THISROUND;
750     }
751   }
752 }
753
754
755 push @freeslot, splice @holdslot;
756 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
757
758
759 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
760 while (%proc)
761 {
762   if ($main::please_continue) {
763     $main::please_continue = 0;
764     goto THISROUND;
765   }
766   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
767   readfrompipes ();
768   if (!reapchildren())
769   {
770     check_refresh_wanted();
771     check_squeue();
772     update_progress_stats();
773     select (undef, undef, undef, 0.1);
774     killem (keys %proc) if $main::please_freeze;
775   }
776 }
777
778 update_progress_stats();
779 freeze_if_want_freeze();
780
781
782 if (!defined $main::success)
783 {
784   if (@jobstep_todo &&
785       $thisround_succeeded == 0 &&
786       ($thisround_failed == 0 || $thisround_failed > 4))
787   {
788     my $message = "stop because $thisround_failed tasks failed and none succeeded";
789     Log (undef, $message);
790     $main::success = 0;
791   }
792   if (!@jobstep_todo)
793   {
794     $main::success = 1;
795   }
796 }
797
798 goto ONELEVEL if !defined $main::success;
799
800
801 release_allocation();
802 freeze();
803 my $collated_output = &collate_output();
804
805 if ($job_has_uuid) {
806   $Job->update_attributes('running' => 0,
807                           'success' => $collated_output && $main::success,
808                           'finished_at' => scalar gmtime)
809 }
810
811 if ($collated_output)
812 {
813   eval {
814     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
815         or die "failed to get collated manifest: $!";
816     # Read the original manifest, and strip permission hints from it,
817     # so we can put the result in a Collection.
818     my @manifest_lines = ();
819     while (my $manifest_line = <$orig_manifest>) {
820       my @words = split(/ /, $manifest_line, -1);
821       foreach my $ii (0..$#words) {
822         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
823           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
824         }
825       }
826       push(@manifest_lines, join(" ", @words));
827     }
828     my $manifest_text = join("", @manifest_lines);
829     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
830       'uuid' => md5_hex($manifest_text),
831       'manifest_text' => $manifest_text,
832     });
833     $Job->update_attributes('output' => $output->{uuid});
834     if ($Job->{'output_is_persistent'}) {
835       $arv->{'links'}->{'create'}->execute('link' => {
836         'tail_kind' => 'arvados#user',
837         'tail_uuid' => $User->{'uuid'},
838         'head_kind' => 'arvados#collection',
839         'head_uuid' => $Job->{'output'},
840         'link_class' => 'resources',
841         'name' => 'wants',
842       });
843     }
844   };
845   if ($@) {
846     Log (undef, "Failed to register output manifest: $@");
847   }
848 }
849
850 Log (undef, "finish");
851
852 save_meta();
853 exit 0;
854
855
856
857 sub update_progress_stats
858 {
859   $progress_stats_updated = time;
860   return if !$progress_is_dirty;
861   my ($todo, $done, $running) = (scalar @jobstep_todo,
862                                  scalar @jobstep_done,
863                                  scalar @slot - scalar @freeslot - scalar @holdslot);
864   $Job->{'tasks_summary'} ||= {};
865   $Job->{'tasks_summary'}->{'todo'} = $todo;
866   $Job->{'tasks_summary'}->{'done'} = $done;
867   $Job->{'tasks_summary'}->{'running'} = $running;
868   if ($job_has_uuid) {
869     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
870   }
871   Log (undef, "status: $done done, $running running, $todo todo");
872   $progress_is_dirty = 0;
873 }
874
875
876
877 sub reapchildren
878 {
879   my $pid = waitpid (-1, WNOHANG);
880   return 0 if $pid <= 0;
881
882   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
883                   . "."
884                   . $slot[$proc{$pid}->{slot}]->{cpu});
885   my $jobstepid = $proc{$pid}->{jobstep};
886   my $elapsed = time - $proc{$pid}->{time};
887   my $Jobstep = $jobstep[$jobstepid];
888
889   my $childstatus = $?;
890   my $exitvalue = $childstatus >> 8;
891   my $exitinfo = sprintf("exit %d signal %d%s",
892                          $exitvalue,
893                          $childstatus & 127,
894                          ($childstatus & 128 ? ' core dump' : ''));
895   $Jobstep->{'arvados_task'}->reload;
896   my $task_success = $Jobstep->{'arvados_task'}->{success};
897
898   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
899
900   if (!defined $task_success) {
901     # task did not indicate one way or the other --> fail
902     $Jobstep->{'arvados_task'}->{success} = 0;
903     $Jobstep->{'arvados_task'}->save;
904     $task_success = 0;
905   }
906
907   if (!$task_success)
908   {
909     my $temporary_fail;
910     $temporary_fail ||= $Jobstep->{node_fail};
911     $temporary_fail ||= ($exitvalue == 111);
912
913     ++$thisround_failed;
914     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
915
916     # Check for signs of a failed or misconfigured node
917     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
918         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
919       # Don't count this against jobstep failure thresholds if this
920       # node is already suspected faulty and srun exited quickly
921       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
922           $elapsed < 5) {
923         Log ($jobstepid, "blaming failure on suspect node " .
924              $slot[$proc{$pid}->{slot}]->{node}->{name});
925         $temporary_fail ||= 1;
926       }
927       ban_node_by_slot($proc{$pid}->{slot});
928     }
929
930     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
931                              ++$Jobstep->{'failures'},
932                              $temporary_fail ? 'temporary ' : 'permanent',
933                              $elapsed));
934
935     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
936       # Give up on this task, and the whole job
937       $main::success = 0;
938       $main::please_freeze = 1;
939     }
940     else {
941       # Put this task back on the todo queue
942       push @jobstep_todo, $jobstepid;
943     }
944     $Job->{'tasks_summary'}->{'failed'}++;
945   }
946   else
947   {
948     ++$thisround_succeeded;
949     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
950     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
951     push @jobstep_done, $jobstepid;
952     Log ($jobstepid, "success in $elapsed seconds");
953   }
954   $Jobstep->{exitcode} = $childstatus;
955   $Jobstep->{finishtime} = time;
956   process_stderr ($jobstepid, $task_success);
957   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
958
959   close $reader{$jobstepid};
960   delete $reader{$jobstepid};
961   delete $slot[$proc{$pid}->{slot}]->{pid};
962   push @freeslot, $proc{$pid}->{slot};
963   delete $proc{$pid};
964
965   # Load new tasks
966   my $newtask_list = [];
967   my $newtask_results;
968   do {
969     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
970       'where' => {
971         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
972       },
973       'order' => 'qsequence',
974       'offset' => scalar(@$newtask_list),
975     );
976     push(@$newtask_list, @{$newtask_results->{items}});
977   } while (@{$newtask_results->{items}});
978   foreach my $arvados_task (@$newtask_list) {
979     my $jobstep = {
980       'level' => $arvados_task->{'sequence'},
981       'failures' => 0,
982       'arvados_task' => $arvados_task
983     };
984     push @jobstep, $jobstep;
985     push @jobstep_todo, $#jobstep;
986   }
987
988   $progress_is_dirty = 1;
989   1;
990 }
991
992 sub check_refresh_wanted
993 {
994   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
995   if (@stat && $stat[9] > $latest_refresh) {
996     $latest_refresh = scalar time;
997     if ($job_has_uuid) {
998       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
999       for my $attr ('cancelled_at',
1000                     'cancelled_by_user_uuid',
1001                     'cancelled_by_client_uuid') {
1002         $Job->{$attr} = $Job2->{$attr};
1003       }
1004       if ($Job->{'cancelled_at'}) {
1005         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1006              " by user " . $Job->{cancelled_by_user_uuid});
1007         $main::success = 0;
1008         $main::please_freeze = 1;
1009       }
1010     }
1011   }
1012 }
1013
1014 sub check_squeue
1015 {
1016   # return if the kill list was checked <4 seconds ago
1017   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1018   {
1019     return;
1020   }
1021   $squeue_kill_checked = time;
1022
1023   # use killem() on procs whose killtime is reached
1024   for (keys %proc)
1025   {
1026     if (exists $proc{$_}->{killtime}
1027         && $proc{$_}->{killtime} <= time)
1028     {
1029       killem ($_);
1030     }
1031   }
1032
1033   # return if the squeue was checked <60 seconds ago
1034   if (defined $squeue_checked && $squeue_checked > time - 60)
1035   {
1036     return;
1037   }
1038   $squeue_checked = time;
1039
1040   if (!$have_slurm)
1041   {
1042     # here is an opportunity to check for mysterious problems with local procs
1043     return;
1044   }
1045
1046   # get a list of steps still running
1047   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1048   chop @squeue;
1049   if ($squeue[-1] ne "ok")
1050   {
1051     return;
1052   }
1053   pop @squeue;
1054
1055   # which of my jobsteps are running, according to squeue?
1056   my %ok;
1057   foreach (@squeue)
1058   {
1059     if (/^(\d+)\.(\d+) (\S+)/)
1060     {
1061       if ($1 eq $ENV{SLURM_JOBID})
1062       {
1063         $ok{$3} = 1;
1064       }
1065     }
1066   }
1067
1068   # which of my active child procs (>60s old) were not mentioned by squeue?
1069   foreach (keys %proc)
1070   {
1071     if ($proc{$_}->{time} < time - 60
1072         && !exists $ok{$proc{$_}->{jobstepname}}
1073         && !exists $proc{$_}->{killtime})
1074     {
1075       # kill this proc if it hasn't exited in 30 seconds
1076       $proc{$_}->{killtime} = time + 30;
1077     }
1078   }
1079 }
1080
1081
1082 sub release_allocation
1083 {
1084   if ($have_slurm)
1085   {
1086     Log (undef, "release job allocation");
1087     system "scancel $ENV{SLURM_JOBID}";
1088   }
1089 }
1090
1091
1092 sub readfrompipes
1093 {
1094   my $gotsome = 0;
1095   foreach my $job (keys %reader)
1096   {
1097     my $buf;
1098     while (0 < sysread ($reader{$job}, $buf, 8192))
1099     {
1100       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1101       $jobstep[$job]->{stderr} .= $buf;
1102       preprocess_stderr ($job);
1103       if (length ($jobstep[$job]->{stderr}) > 16384)
1104       {
1105         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1106       }
1107       $gotsome = 1;
1108     }
1109   }
1110   return $gotsome;
1111 }
1112
1113
1114 sub preprocess_stderr
1115 {
1116   my $job = shift;
1117
1118   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1119     my $line = $1;
1120     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1121     Log ($job, "stderr $line");
1122     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1123       # whoa.
1124       $main::please_freeze = 1;
1125     }
1126     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1127       $jobstep[$job]->{node_fail} = 1;
1128       ban_node_by_slot($jobstep[$job]->{slotindex});
1129     }
1130   }
1131 }
1132
1133
1134 sub process_stderr
1135 {
1136   my $job = shift;
1137   my $task_success = shift;
1138   preprocess_stderr ($job);
1139
1140   map {
1141     Log ($job, "stderr $_");
1142   } split ("\n", $jobstep[$job]->{stderr});
1143 }
1144
1145 sub fetch_block
1146 {
1147   my $hash = shift;
1148   my ($keep, $child_out, $output_block);
1149
1150   my $cmd = "$arv_cli keep get \Q$hash\E";
1151   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1152   sysread($keep, $output_block, 64 * 1024 * 1024);
1153   close $keep;
1154   return $output_block;
1155 }
1156
1157 sub collate_output
1158 {
1159   Log (undef, "collate");
1160
1161   my ($child_out, $child_in);
1162   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1163   my $joboutput;
1164   for (@jobstep)
1165   {
1166     next if (!exists $_->{'arvados_task'}->{output} ||
1167              !$_->{'arvados_task'}->{'success'} ||
1168              $_->{'exitcode'} != 0);
1169     my $output = $_->{'arvados_task'}->{output};
1170     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1171     {
1172       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1173       print $child_in $output;
1174     }
1175     elsif (@jobstep == 1)
1176     {
1177       $joboutput = $output;
1178       last;
1179     }
1180     elsif (defined (my $outblock = fetch_block ($output)))
1181     {
1182       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1183       print $child_in $outblock;
1184     }
1185     else
1186     {
1187       Log (undef, "XXX fetch_block($output) failed XXX");
1188       $main::success = 0;
1189     }
1190   }
1191   $child_in->close;
1192
1193   if (!defined $joboutput) {
1194     my $s = IO::Select->new($child_out);
1195     if ($s->can_read(120)) {
1196       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1197       chomp($joboutput);
1198     } else {
1199       Log (undef, "timed out reading from 'arv keep put'");
1200     }
1201   }
1202   waitpid($pid, 0);
1203
1204   if ($joboutput)
1205   {
1206     Log (undef, "output $joboutput");
1207     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1208   }
1209   else
1210   {
1211     Log (undef, "output undef");
1212   }
1213   return $joboutput;
1214 }
1215
1216
1217 sub killem
1218 {
1219   foreach (@_)
1220   {
1221     my $sig = 2;                # SIGINT first
1222     if (exists $proc{$_}->{"sent_$sig"} &&
1223         time - $proc{$_}->{"sent_$sig"} > 4)
1224     {
1225       $sig = 15;                # SIGTERM if SIGINT doesn't work
1226     }
1227     if (exists $proc{$_}->{"sent_$sig"} &&
1228         time - $proc{$_}->{"sent_$sig"} > 4)
1229     {
1230       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1231     }
1232     if (!exists $proc{$_}->{"sent_$sig"})
1233     {
1234       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1235       kill $sig, $_;
1236       select (undef, undef, undef, 0.1);
1237       if ($sig == 2)
1238       {
1239         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1240       }
1241       $proc{$_}->{"sent_$sig"} = time;
1242       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1243     }
1244   }
1245 }
1246
1247
1248 sub fhbits
1249 {
1250   my($bits);
1251   for (@_) {
1252     vec($bits,fileno($_),1) = 1;
1253   }
1254   $bits;
1255 }
1256
1257
1258 sub Log                         # ($jobstep_id, $logmessage)
1259 {
1260   if ($_[1] =~ /\n/) {
1261     for my $line (split (/\n/, $_[1])) {
1262       Log ($_[0], $line);
1263     }
1264     return;
1265   }
1266   my $fh = select STDERR; $|=1; select $fh;
1267   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1268   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1269   $message .= "\n";
1270   my $datetime;
1271   if ($local_logfile || -t STDERR) {
1272     my @gmtime = gmtime;
1273     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1274                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1275   }
1276   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1277
1278   if ($local_logfile) {
1279     print $local_logfile $datetime . " " . $message;
1280   }
1281 }
1282
1283
1284 sub croak
1285 {
1286   my ($package, $file, $line) = caller;
1287   my $message = "@_ at $file line $line\n";
1288   Log (undef, $message);
1289   freeze() if @jobstep_todo;
1290   collate_output() if @jobstep_todo;
1291   cleanup();
1292   save_meta() if $local_logfile;
1293   die;
1294 }
1295
1296
1297 sub cleanup
1298 {
1299   return if !$job_has_uuid;
1300   $Job->update_attributes('running' => 0,
1301                           'success' => 0,
1302                           'finished_at' => scalar gmtime);
1303 }
1304
1305
1306 sub save_meta
1307 {
1308   my $justcheckpoint = shift; # false if this will be the last meta saved
1309   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1310
1311   $local_logfile->flush;
1312   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1313       . quotemeta($local_logfile->filename);
1314   my $loglocator = `$cmd`;
1315   die "system $cmd failed: $?" if $?;
1316   chomp($loglocator);
1317
1318   $local_logfile = undef;   # the temp file is automatically deleted
1319   Log (undef, "log manifest is $loglocator");
1320   $Job->{'log'} = $loglocator;
1321   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1322 }
1323
1324
1325 sub freeze_if_want_freeze
1326 {
1327   if ($main::please_freeze)
1328   {
1329     release_allocation();
1330     if (@_)
1331     {
1332       # kill some srun procs before freeze+stop
1333       map { $proc{$_} = {} } @_;
1334       while (%proc)
1335       {
1336         killem (keys %proc);
1337         select (undef, undef, undef, 0.1);
1338         my $died;
1339         while (($died = waitpid (-1, WNOHANG)) > 0)
1340         {
1341           delete $proc{$died};
1342         }
1343       }
1344     }
1345     freeze();
1346     collate_output();
1347     cleanup();
1348     save_meta();
1349     exit 0;
1350   }
1351 }
1352
1353
1354 sub freeze
1355 {
1356   Log (undef, "Freeze not implemented");
1357   return;
1358 }
1359
1360
1361 sub thaw
1362 {
1363   croak ("Thaw not implemented");
1364 }
1365
1366
1367 sub freezequote
1368 {
1369   my $s = shift;
1370   $s =~ s/\\/\\\\/g;
1371   $s =~ s/\n/\\n/g;
1372   return $s;
1373 }
1374
1375
1376 sub freezeunquote
1377 {
1378   my $s = shift;
1379   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1380   return $s;
1381 }
1382
1383
1384 sub srun
1385 {
1386   my $srunargs = shift;
1387   my $execargs = shift;
1388   my $opts = shift || {};
1389   my $stdin = shift;
1390   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1391   print STDERR (join (" ",
1392                       map { / / ? "'$_'" : $_ }
1393                       (@$args)),
1394                 "\n")
1395       if $ENV{CRUNCH_DEBUG};
1396
1397   if (defined $stdin) {
1398     my $child = open STDIN, "-|";
1399     defined $child or die "no fork: $!";
1400     if ($child == 0) {
1401       print $stdin or die $!;
1402       close STDOUT or die $!;
1403       exit 0;
1404     }
1405   }
1406
1407   return system (@$args) if $opts->{fork};
1408
1409   exec @$args;
1410   warn "ENV size is ".length(join(" ",%ENV));
1411   die "exec failed: $!: @$args";
1412 }
1413
1414
1415 sub ban_node_by_slot {
1416   # Don't start any new jobsteps on this node for 60 seconds
1417   my $slotid = shift;
1418   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1419   $slot[$slotid]->{node}->{hold_count}++;
1420   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1421 }
1422
1423 sub must_lock_now
1424 {
1425   my ($lockfile, $error_message) = @_;
1426   open L, ">", $lockfile or croak("$lockfile: $!");
1427   if (!flock L, LOCK_EX|LOCK_NB) {
1428     croak("Can't lock $lockfile: $error_message\n");
1429   }
1430 }
1431
1432 __DATA__
1433 #!/usr/bin/perl
1434
1435 # checkout-and-build
1436
1437 use Fcntl ':flock';
1438
1439 my $destdir = $ENV{"CRUNCH_SRC"};
1440 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1441 my $repo = $ENV{"CRUNCH_SRC_URL"};
1442
1443 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1444 flock L, LOCK_EX;
1445 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1446     exit 0;
1447 }
1448
1449 unlink "$destdir.commit";
1450 open STDOUT, ">", "$destdir.log";
1451 open STDERR, ">&STDOUT";
1452
1453 mkdir $destdir;
1454 my @git_archive_data = <DATA>;
1455 if (@git_archive_data) {
1456   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1457   print TARX @git_archive_data;
1458   if(!close(TARX)) {
1459     die "'tar -C $destdir -xf -' exited $?: $!";
1460   }
1461 }
1462
1463 my $pwd;
1464 chomp ($pwd = `pwd`);
1465 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1466 mkdir $install_dir;
1467
1468 for my $src_path ("$destdir/arvados/sdk/python") {
1469   if (-d $src_path) {
1470     shell_or_die ("virtualenv", $install_dir);
1471     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1472   }
1473 }
1474
1475 if (-e "$destdir/crunch_scripts/install") {
1476     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1477 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1478     # Old version
1479     shell_or_die ("./tests/autotests.sh", $install_dir);
1480 } elsif (-e "./install.sh") {
1481     shell_or_die ("./install.sh", $install_dir);
1482 }
1483
1484 if ($commit) {
1485     unlink "$destdir.commit.new";
1486     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1487     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1488 }
1489
1490 close L;
1491
1492 exit 0;
1493
1494 sub shell_or_die
1495 {
1496   if ($ENV{"DEBUG"}) {
1497     print STDERR "@_\n";
1498   }
1499   system (@_) == 0
1500       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1501 }
1502
1503 __DATA__