2879: crunch-job uses Job Docker information provided by API server.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506   $docker_hash = find_docker_hash($docker_locator);
507   if (!$docker_hash)
508   {
509     croak("No Docker image hash found from locator $docker_locator");
510   }
511   my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
514 fi
515 };
516   my $docker_pid = fork();
517   if ($docker_pid == 0)
518   {
519     srun (["srun", "--nodelist=" . join(',', @node)],
520           ["/bin/sh", "-ec", $docker_install_script]);
521     exit ($?);
522   }
523   while (1)
524   {
525     last if $docker_pid == waitpid (-1, WNOHANG);
526     freeze_if_want_freeze ($docker_pid);
527     select (undef, undef, undef, 0.1);
528   }
529   if ($? != 0)
530   {
531     croak("Installing Docker image from $docker_locator returned exit code $?");
532   }
533 }
534
535 foreach (qw (script script_version script_parameters runtime_constraints))
536 {
537   Log (undef,
538        "$_ " .
539        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
540 }
541 foreach (split (/\n/, $Job->{knobs}))
542 {
543   Log (undef, "knob " . $_);
544 }
545
546
547
548 $main::success = undef;
549
550
551
552 ONELEVEL:
553
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
557
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559                        or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
562
563
564
565 my %proc;
566 my @freeslot = (0..$#slot);
567 my @holdslot;
568 my %reader;
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
571
572 update_progress_stats();
573
574
575
576 THISROUND:
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
578 {
579   my $id = $jobstep_todo[$todo_ptr];
580   my $Jobstep = $jobstep[$id];
581   if ($Jobstep->{level} != $level)
582   {
583     next;
584   }
585
586   pipe $reader{$id}, "writer" or croak ($!);
587   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
589
590   my $childslot = $freeslot[0];
591   my $childnode = $slot[$childslot]->{node};
592   my $childslotname = join (".",
593                             $slot[$childslot]->{node}->{name},
594                             $slot[$childslot]->{cpu});
595   my $childpid = fork();
596   if ($childpid == 0)
597   {
598     $SIG{'INT'} = 'DEFAULT';
599     $SIG{'QUIT'} = 'DEFAULT';
600     $SIG{'TERM'} = 'DEFAULT';
601
602     foreach (values (%reader))
603     {
604       close($_);
605     }
606     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607     open(STDOUT,">&writer");
608     open(STDERR,">&writer");
609
610     undef $dbh;
611     undef $sth;
612
613     delete $ENV{"GNUPGHOME"};
614     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615     $ENV{"TASK_QSEQUENCE"} = $id;
616     $ENV{"TASK_SEQUENCE"} = $level;
617     $ENV{"JOB_SCRIPT"} = $Job->{script};
618     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619       $param =~ tr/a-z/A-Z/;
620       $ENV{"JOB_PARAMETER_$param"} = $value;
621     }
622     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
626     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
627     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
628     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
629
630     $ENV{"GZIP"} = "-n";
631
632     my @srunargs = (
633       "srun",
634       "--nodelist=".$childnode->{name},
635       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
636       "--job-name=$job_id.$id.$$",
637         );
638     my $build_script_to_send = "";
639     my $command =
640         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
641         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
642         ."&& cd $ENV{CRUNCH_TMP} ";
643     if ($build_script)
644     {
645       $build_script_to_send = $build_script;
646       $command .=
647           "&& perl -";
648     }
649     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
650     if ($docker_hash)
651     {
652       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
653       $command .= "$docker_bin run -i -a stdin -a stdout -a stderr --cidfile=$ENV{TASK_WORK}/docker.cid ";
654       # Dynamically configure the container to use the host system as its
655       # DNS server.  Get the host's global addresses from the ip command,
656       # and turn them into docker --dns options using gawk.
657       $command .=
658           q{$(ip -o address show scope global |
659               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
660       foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
661       {
662         $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
663       }
664       while (my ($env_key, $env_val) = each %ENV)
665       {
666         if ($env_key =~ /^(JOB|TASK)_/) {
667           $command .= "-e \Q$env_key=$env_val\E ";
668         }
669       }
670       $command .= "\Q$docker_hash\E ";
671     } else {
672       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "
673     }
674     $command .= "stdbuf -o0 -e0 ";
675     $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
676     my @execargs = ('bash', '-c', $command);
677     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
678     exit (111);
679   }
680   close("writer");
681   if (!defined $childpid)
682   {
683     close $reader{$id};
684     delete $reader{$id};
685     next;
686   }
687   shift @freeslot;
688   $proc{$childpid} = { jobstep => $id,
689                        time => time,
690                        slot => $childslot,
691                        jobstepname => "$job_id.$id.$childpid",
692                      };
693   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
694   $slot[$childslot]->{pid} = $childpid;
695
696   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
697   Log ($id, "child $childpid started on $childslotname");
698   $Jobstep->{starttime} = time;
699   $Jobstep->{node} = $childnode->{name};
700   $Jobstep->{slotindex} = $childslot;
701   delete $Jobstep->{stderr};
702   delete $Jobstep->{finishtime};
703
704   splice @jobstep_todo, $todo_ptr, 1;
705   --$todo_ptr;
706
707   $progress_is_dirty = 1;
708
709   while (!@freeslot
710          ||
711          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
712   {
713     last THISROUND if $main::please_freeze;
714     if ($main::please_info)
715     {
716       $main::please_info = 0;
717       freeze();
718       collate_output();
719       save_meta(1);
720       update_progress_stats();
721     }
722     my $gotsome
723         = readfrompipes ()
724         + reapchildren ();
725     if (!$gotsome)
726     {
727       check_refresh_wanted();
728       check_squeue();
729       update_progress_stats();
730       select (undef, undef, undef, 0.1);
731     }
732     elsif (time - $progress_stats_updated >= 30)
733     {
734       update_progress_stats();
735     }
736     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
737         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
738     {
739       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
740           .($thisround_failed+$thisround_succeeded)
741           .") -- giving up on this round";
742       Log (undef, $message);
743       last THISROUND;
744     }
745
746     # move slots from freeslot to holdslot (or back to freeslot) if necessary
747     for (my $i=$#freeslot; $i>=0; $i--) {
748       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
749         push @holdslot, (splice @freeslot, $i, 1);
750       }
751     }
752     for (my $i=$#holdslot; $i>=0; $i--) {
753       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
754         push @freeslot, (splice @holdslot, $i, 1);
755       }
756     }
757
758     # give up if no nodes are succeeding
759     if (!grep { $_->{node}->{losing_streak} == 0 &&
760                     $_->{node}->{hold_count} < 4 } @slot) {
761       my $message = "Every node has failed -- giving up on this round";
762       Log (undef, $message);
763       last THISROUND;
764     }
765   }
766 }
767
768
769 push @freeslot, splice @holdslot;
770 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
771
772
773 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
774 while (%proc)
775 {
776   if ($main::please_continue) {
777     $main::please_continue = 0;
778     goto THISROUND;
779   }
780   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
781   readfrompipes ();
782   if (!reapchildren())
783   {
784     check_refresh_wanted();
785     check_squeue();
786     update_progress_stats();
787     select (undef, undef, undef, 0.1);
788     killem (keys %proc) if $main::please_freeze;
789   }
790 }
791
792 update_progress_stats();
793 freeze_if_want_freeze();
794
795
796 if (!defined $main::success)
797 {
798   if (@jobstep_todo &&
799       $thisround_succeeded == 0 &&
800       ($thisround_failed == 0 || $thisround_failed > 4))
801   {
802     my $message = "stop because $thisround_failed tasks failed and none succeeded";
803     Log (undef, $message);
804     $main::success = 0;
805   }
806   if (!@jobstep_todo)
807   {
808     $main::success = 1;
809   }
810 }
811
812 goto ONELEVEL if !defined $main::success;
813
814
815 release_allocation();
816 freeze();
817 my $collated_output = &collate_output();
818
819 if ($job_has_uuid) {
820   $Job->update_attributes('running' => 0,
821                           'success' => $collated_output && $main::success,
822                           'finished_at' => scalar gmtime)
823 }
824
825 if ($collated_output)
826 {
827   eval {
828     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
829         or die "failed to get collated manifest: $!";
830     # Read the original manifest, and strip permission hints from it,
831     # so we can put the result in a Collection.
832     my @stripped_manifest_lines = ();
833     my $orig_manifest_text = '';
834     while (my $manifest_line = <$orig_manifest>) {
835       $orig_manifest_text .= $manifest_line;
836       my @words = split(/ /, $manifest_line, -1);
837       foreach my $ii (0..$#words) {
838         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
839           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
840         }
841       }
842       push(@stripped_manifest_lines, join(" ", @words));
843     }
844     my $stripped_manifest_text = join("", @stripped_manifest_lines);
845     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
846       'uuid' => md5_hex($stripped_manifest_text),
847       'manifest_text' => $orig_manifest_text,
848     });
849     $Job->update_attributes('output' => $output->{uuid});
850     if ($Job->{'output_is_persistent'}) {
851       $arv->{'links'}->{'create'}->execute('link' => {
852         'tail_kind' => 'arvados#user',
853         'tail_uuid' => $User->{'uuid'},
854         'head_kind' => 'arvados#collection',
855         'head_uuid' => $Job->{'output'},
856         'link_class' => 'resources',
857         'name' => 'wants',
858       });
859     }
860   };
861   if ($@) {
862     Log (undef, "Failed to register output manifest: $@");
863   }
864 }
865
866 Log (undef, "finish");
867
868 save_meta();
869 exit 0;
870
871
872
873 sub update_progress_stats
874 {
875   $progress_stats_updated = time;
876   return if !$progress_is_dirty;
877   my ($todo, $done, $running) = (scalar @jobstep_todo,
878                                  scalar @jobstep_done,
879                                  scalar @slot - scalar @freeslot - scalar @holdslot);
880   $Job->{'tasks_summary'} ||= {};
881   $Job->{'tasks_summary'}->{'todo'} = $todo;
882   $Job->{'tasks_summary'}->{'done'} = $done;
883   $Job->{'tasks_summary'}->{'running'} = $running;
884   if ($job_has_uuid) {
885     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
886   }
887   Log (undef, "status: $done done, $running running, $todo todo");
888   $progress_is_dirty = 0;
889 }
890
891
892
893 sub reapchildren
894 {
895   my $pid = waitpid (-1, WNOHANG);
896   return 0 if $pid <= 0;
897
898   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
899                   . "."
900                   . $slot[$proc{$pid}->{slot}]->{cpu});
901   my $jobstepid = $proc{$pid}->{jobstep};
902   my $elapsed = time - $proc{$pid}->{time};
903   my $Jobstep = $jobstep[$jobstepid];
904
905   my $childstatus = $?;
906   my $exitvalue = $childstatus >> 8;
907   my $exitinfo = sprintf("exit %d signal %d%s",
908                          $exitvalue,
909                          $childstatus & 127,
910                          ($childstatus & 128 ? ' core dump' : ''));
911   $Jobstep->{'arvados_task'}->reload;
912   my $task_success = $Jobstep->{'arvados_task'}->{success};
913
914   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
915
916   if (!defined $task_success) {
917     # task did not indicate one way or the other --> fail
918     $Jobstep->{'arvados_task'}->{success} = 0;
919     $Jobstep->{'arvados_task'}->save;
920     $task_success = 0;
921   }
922
923   if (!$task_success)
924   {
925     my $temporary_fail;
926     $temporary_fail ||= $Jobstep->{node_fail};
927     $temporary_fail ||= ($exitvalue == 111);
928
929     ++$thisround_failed;
930     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
931
932     # Check for signs of a failed or misconfigured node
933     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
934         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
935       # Don't count this against jobstep failure thresholds if this
936       # node is already suspected faulty and srun exited quickly
937       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
938           $elapsed < 5) {
939         Log ($jobstepid, "blaming failure on suspect node " .
940              $slot[$proc{$pid}->{slot}]->{node}->{name});
941         $temporary_fail ||= 1;
942       }
943       ban_node_by_slot($proc{$pid}->{slot});
944     }
945
946     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
947                              ++$Jobstep->{'failures'},
948                              $temporary_fail ? 'temporary ' : 'permanent',
949                              $elapsed));
950
951     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
952       # Give up on this task, and the whole job
953       $main::success = 0;
954       $main::please_freeze = 1;
955     }
956     else {
957       # Put this task back on the todo queue
958       push @jobstep_todo, $jobstepid;
959     }
960     $Job->{'tasks_summary'}->{'failed'}++;
961   }
962   else
963   {
964     ++$thisround_succeeded;
965     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
966     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
967     push @jobstep_done, $jobstepid;
968     Log ($jobstepid, "success in $elapsed seconds");
969   }
970   $Jobstep->{exitcode} = $childstatus;
971   $Jobstep->{finishtime} = time;
972   process_stderr ($jobstepid, $task_success);
973   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
974
975   close $reader{$jobstepid};
976   delete $reader{$jobstepid};
977   delete $slot[$proc{$pid}->{slot}]->{pid};
978   push @freeslot, $proc{$pid}->{slot};
979   delete $proc{$pid};
980
981   # Load new tasks
982   my $newtask_list = [];
983   my $newtask_results;
984   do {
985     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
986       'where' => {
987         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
988       },
989       'order' => 'qsequence',
990       'offset' => scalar(@$newtask_list),
991     );
992     push(@$newtask_list, @{$newtask_results->{items}});
993   } while (@{$newtask_results->{items}});
994   foreach my $arvados_task (@$newtask_list) {
995     my $jobstep = {
996       'level' => $arvados_task->{'sequence'},
997       'failures' => 0,
998       'arvados_task' => $arvados_task
999     };
1000     push @jobstep, $jobstep;
1001     push @jobstep_todo, $#jobstep;
1002   }
1003
1004   $progress_is_dirty = 1;
1005   1;
1006 }
1007
1008 sub check_refresh_wanted
1009 {
1010   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1011   if (@stat && $stat[9] > $latest_refresh) {
1012     $latest_refresh = scalar time;
1013     if ($job_has_uuid) {
1014       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1015       for my $attr ('cancelled_at',
1016                     'cancelled_by_user_uuid',
1017                     'cancelled_by_client_uuid') {
1018         $Job->{$attr} = $Job2->{$attr};
1019       }
1020       if ($Job->{'cancelled_at'}) {
1021         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1022              " by user " . $Job->{cancelled_by_user_uuid});
1023         $main::success = 0;
1024         $main::please_freeze = 1;
1025       }
1026     }
1027   }
1028 }
1029
1030 sub check_squeue
1031 {
1032   # return if the kill list was checked <4 seconds ago
1033   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1034   {
1035     return;
1036   }
1037   $squeue_kill_checked = time;
1038
1039   # use killem() on procs whose killtime is reached
1040   for (keys %proc)
1041   {
1042     if (exists $proc{$_}->{killtime}
1043         && $proc{$_}->{killtime} <= time)
1044     {
1045       killem ($_);
1046     }
1047   }
1048
1049   # return if the squeue was checked <60 seconds ago
1050   if (defined $squeue_checked && $squeue_checked > time - 60)
1051   {
1052     return;
1053   }
1054   $squeue_checked = time;
1055
1056   if (!$have_slurm)
1057   {
1058     # here is an opportunity to check for mysterious problems with local procs
1059     return;
1060   }
1061
1062   # get a list of steps still running
1063   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1064   chop @squeue;
1065   if ($squeue[-1] ne "ok")
1066   {
1067     return;
1068   }
1069   pop @squeue;
1070
1071   # which of my jobsteps are running, according to squeue?
1072   my %ok;
1073   foreach (@squeue)
1074   {
1075     if (/^(\d+)\.(\d+) (\S+)/)
1076     {
1077       if ($1 eq $ENV{SLURM_JOBID})
1078       {
1079         $ok{$3} = 1;
1080       }
1081     }
1082   }
1083
1084   # which of my active child procs (>60s old) were not mentioned by squeue?
1085   foreach (keys %proc)
1086   {
1087     if ($proc{$_}->{time} < time - 60
1088         && !exists $ok{$proc{$_}->{jobstepname}}
1089         && !exists $proc{$_}->{killtime})
1090     {
1091       # kill this proc if it hasn't exited in 30 seconds
1092       $proc{$_}->{killtime} = time + 30;
1093     }
1094   }
1095 }
1096
1097
1098 sub release_allocation
1099 {
1100   if ($have_slurm)
1101   {
1102     Log (undef, "release job allocation");
1103     system "scancel $ENV{SLURM_JOBID}";
1104   }
1105 }
1106
1107
1108 sub readfrompipes
1109 {
1110   my $gotsome = 0;
1111   foreach my $job (keys %reader)
1112   {
1113     my $buf;
1114     while (0 < sysread ($reader{$job}, $buf, 8192))
1115     {
1116       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1117       $jobstep[$job]->{stderr} .= $buf;
1118       preprocess_stderr ($job);
1119       if (length ($jobstep[$job]->{stderr}) > 16384)
1120       {
1121         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1122       }
1123       $gotsome = 1;
1124     }
1125   }
1126   return $gotsome;
1127 }
1128
1129
1130 sub preprocess_stderr
1131 {
1132   my $job = shift;
1133
1134   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1135     my $line = $1;
1136     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1137     Log ($job, "stderr $line");
1138     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1139       # whoa.
1140       $main::please_freeze = 1;
1141     }
1142     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1143       $jobstep[$job]->{node_fail} = 1;
1144       ban_node_by_slot($jobstep[$job]->{slotindex});
1145     }
1146   }
1147 }
1148
1149
1150 sub process_stderr
1151 {
1152   my $job = shift;
1153   my $task_success = shift;
1154   preprocess_stderr ($job);
1155
1156   map {
1157     Log ($job, "stderr $_");
1158   } split ("\n", $jobstep[$job]->{stderr});
1159 }
1160
1161 sub fetch_block
1162 {
1163   my $hash = shift;
1164   my ($keep, $child_out, $output_block);
1165
1166   my $cmd = "$arv_cli keep get \Q$hash\E";
1167   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1168   sysread($keep, $output_block, 64 * 1024 * 1024);
1169   close $keep;
1170   return $output_block;
1171 }
1172
1173 sub collate_output
1174 {
1175   Log (undef, "collate");
1176
1177   my ($child_out, $child_in);
1178   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1179   my $joboutput;
1180   for (@jobstep)
1181   {
1182     next if (!exists $_->{'arvados_task'}->{output} ||
1183              !$_->{'arvados_task'}->{'success'} ||
1184              $_->{'exitcode'} != 0);
1185     my $output = $_->{'arvados_task'}->{output};
1186     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1187     {
1188       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1189       print $child_in $output;
1190     }
1191     elsif (@jobstep == 1)
1192     {
1193       $joboutput = $output;
1194       last;
1195     }
1196     elsif (defined (my $outblock = fetch_block ($output)))
1197     {
1198       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1199       print $child_in $outblock;
1200     }
1201     else
1202     {
1203       Log (undef, "XXX fetch_block($output) failed XXX");
1204       $main::success = 0;
1205     }
1206   }
1207   $child_in->close;
1208
1209   if (!defined $joboutput) {
1210     my $s = IO::Select->new($child_out);
1211     if ($s->can_read(120)) {
1212       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1213       chomp($joboutput);
1214     } else {
1215       Log (undef, "timed out reading from 'arv keep put'");
1216     }
1217   }
1218   waitpid($pid, 0);
1219
1220   if ($joboutput)
1221   {
1222     Log (undef, "output $joboutput");
1223     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1224   }
1225   else
1226   {
1227     Log (undef, "output undef");
1228   }
1229   return $joboutput;
1230 }
1231
1232
1233 sub killem
1234 {
1235   foreach (@_)
1236   {
1237     my $sig = 2;                # SIGINT first
1238     if (exists $proc{$_}->{"sent_$sig"} &&
1239         time - $proc{$_}->{"sent_$sig"} > 4)
1240     {
1241       $sig = 15;                # SIGTERM if SIGINT doesn't work
1242     }
1243     if (exists $proc{$_}->{"sent_$sig"} &&
1244         time - $proc{$_}->{"sent_$sig"} > 4)
1245     {
1246       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1247     }
1248     if (!exists $proc{$_}->{"sent_$sig"})
1249     {
1250       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1251       kill $sig, $_;
1252       select (undef, undef, undef, 0.1);
1253       if ($sig == 2)
1254       {
1255         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1256       }
1257       $proc{$_}->{"sent_$sig"} = time;
1258       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1259     }
1260   }
1261 }
1262
1263
1264 sub fhbits
1265 {
1266   my($bits);
1267   for (@_) {
1268     vec($bits,fileno($_),1) = 1;
1269   }
1270   $bits;
1271 }
1272
1273
1274 sub Log                         # ($jobstep_id, $logmessage)
1275 {
1276   if ($_[1] =~ /\n/) {
1277     for my $line (split (/\n/, $_[1])) {
1278       Log ($_[0], $line);
1279     }
1280     return;
1281   }
1282   my $fh = select STDERR; $|=1; select $fh;
1283   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1284   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1285   $message .= "\n";
1286   my $datetime;
1287   if ($local_logfile || -t STDERR) {
1288     my @gmtime = gmtime;
1289     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1290                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1291   }
1292   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1293
1294   if ($local_logfile) {
1295     print $local_logfile $datetime . " " . $message;
1296   }
1297 }
1298
1299
1300 sub croak
1301 {
1302   my ($package, $file, $line) = caller;
1303   my $message = "@_ at $file line $line\n";
1304   Log (undef, $message);
1305   freeze() if @jobstep_todo;
1306   collate_output() if @jobstep_todo;
1307   cleanup();
1308   save_meta() if $local_logfile;
1309   die;
1310 }
1311
1312
1313 sub cleanup
1314 {
1315   return if !$job_has_uuid;
1316   $Job->update_attributes('running' => 0,
1317                           'success' => 0,
1318                           'finished_at' => scalar gmtime);
1319 }
1320
1321
1322 sub save_meta
1323 {
1324   my $justcheckpoint = shift; # false if this will be the last meta saved
1325   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1326
1327   $local_logfile->flush;
1328   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1329       . quotemeta($local_logfile->filename);
1330   my $loglocator = `$cmd`;
1331   die "system $cmd failed: $?" if $?;
1332   chomp($loglocator);
1333
1334   $local_logfile = undef;   # the temp file is automatically deleted
1335   Log (undef, "log manifest is $loglocator");
1336   $Job->{'log'} = $loglocator;
1337   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1338 }
1339
1340
1341 sub freeze_if_want_freeze
1342 {
1343   if ($main::please_freeze)
1344   {
1345     release_allocation();
1346     if (@_)
1347     {
1348       # kill some srun procs before freeze+stop
1349       map { $proc{$_} = {} } @_;
1350       while (%proc)
1351       {
1352         killem (keys %proc);
1353         select (undef, undef, undef, 0.1);
1354         my $died;
1355         while (($died = waitpid (-1, WNOHANG)) > 0)
1356         {
1357           delete $proc{$died};
1358         }
1359       }
1360     }
1361     freeze();
1362     collate_output();
1363     cleanup();
1364     save_meta();
1365     exit 0;
1366   }
1367 }
1368
1369
1370 sub freeze
1371 {
1372   Log (undef, "Freeze not implemented");
1373   return;
1374 }
1375
1376
1377 sub thaw
1378 {
1379   croak ("Thaw not implemented");
1380 }
1381
1382
1383 sub freezequote
1384 {
1385   my $s = shift;
1386   $s =~ s/\\/\\\\/g;
1387   $s =~ s/\n/\\n/g;
1388   return $s;
1389 }
1390
1391
1392 sub freezeunquote
1393 {
1394   my $s = shift;
1395   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1396   return $s;
1397 }
1398
1399
1400 sub srun
1401 {
1402   my $srunargs = shift;
1403   my $execargs = shift;
1404   my $opts = shift || {};
1405   my $stdin = shift;
1406   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1407   print STDERR (join (" ",
1408                       map { / / ? "'$_'" : $_ }
1409                       (@$args)),
1410                 "\n")
1411       if $ENV{CRUNCH_DEBUG};
1412
1413   if (defined $stdin) {
1414     my $child = open STDIN, "-|";
1415     defined $child or die "no fork: $!";
1416     if ($child == 0) {
1417       print $stdin or die $!;
1418       close STDOUT or die $!;
1419       exit 0;
1420     }
1421   }
1422
1423   return system (@$args) if $opts->{fork};
1424
1425   exec @$args;
1426   warn "ENV size is ".length(join(" ",%ENV));
1427   die "exec failed: $!: @$args";
1428 }
1429
1430
1431 sub ban_node_by_slot {
1432   # Don't start any new jobsteps on this node for 60 seconds
1433   my $slotid = shift;
1434   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1435   $slot[$slotid]->{node}->{hold_count}++;
1436   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1437 }
1438
1439 sub must_lock_now
1440 {
1441   my ($lockfile, $error_message) = @_;
1442   open L, ">", $lockfile or croak("$lockfile: $!");
1443   if (!flock L, LOCK_EX|LOCK_NB) {
1444     croak("Can't lock $lockfile: $error_message\n");
1445   }
1446 }
1447
1448 sub find_docker_hash {
1449   # Given a Keep locator, search for a matching link to find the Docker hash
1450   # of the stored image.
1451   my $locator = shift;
1452   my $links_result = $arv->{links}->{list}->execute(
1453     filters => [["head_uuid", "=", $locator],
1454                 ["link_class", "=", "docker_image_hash"]],
1455     limit => 1);
1456   my $docker_hash;
1457   foreach my $link (@{$links_result->{items}}) {
1458     $docker_hash = lc($link->{name});
1459   }
1460   return $docker_hash;
1461 }
1462
1463 __DATA__
1464 #!/usr/bin/perl
1465
1466 # checkout-and-build
1467
1468 use Fcntl ':flock';
1469
1470 my $destdir = $ENV{"CRUNCH_SRC"};
1471 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1472 my $repo = $ENV{"CRUNCH_SRC_URL"};
1473
1474 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1475 flock L, LOCK_EX;
1476 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1477     exit 0;
1478 }
1479
1480 unlink "$destdir.commit";
1481 open STDOUT, ">", "$destdir.log";
1482 open STDERR, ">&STDOUT";
1483
1484 mkdir $destdir;
1485 my @git_archive_data = <DATA>;
1486 if (@git_archive_data) {
1487   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1488   print TARX @git_archive_data;
1489   if(!close(TARX)) {
1490     die "'tar -C $destdir -xf -' exited $?: $!";
1491   }
1492 }
1493
1494 my $pwd;
1495 chomp ($pwd = `pwd`);
1496 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1497 mkdir $install_dir;
1498
1499 for my $src_path ("$destdir/arvados/sdk/python") {
1500   if (-d $src_path) {
1501     shell_or_die ("virtualenv", $install_dir);
1502     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1503   }
1504 }
1505
1506 if (-e "$destdir/crunch_scripts/install") {
1507     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1508 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1509     # Old version
1510     shell_or_die ("./tests/autotests.sh", $install_dir);
1511 } elsif (-e "./install.sh") {
1512     shell_or_die ("./install.sh", $install_dir);
1513 }
1514
1515 if ($commit) {
1516     unlink "$destdir.commit.new";
1517     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1518     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1519 }
1520
1521 close L;
1522
1523 exit 0;
1524
1525 sub shell_or_die
1526 {
1527   if ($ENV{"DEBUG"}) {
1528     print STDERR "@_\n";
1529   }
1530   system (@_) == 0
1531       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1532 }
1533
1534 __DATA__