9c9d9c1ec96ed79796ae13e002bfc00538feb874
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $arv_cli;
100
101 if (defined $ENV{"ARV_CLI"}) {
102   $arv_cli = $ENV{"ARV_CLI"};
103 }
104 else {
105   $arv_cli = 'arv';
106 }
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     if ($Job->{'is_locked_by_uuid'}) {
156       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157     }
158     if ($Job->{'success'} ne undef) {
159       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160     }
161     if ($Job->{'running'}) {
162       croak("Job 'running' flag is already set");
163     }
164     if ($Job->{'started_at'}) {
165       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
166     }
167   }
168 }
169 else
170 {
171   $Job = JSON::decode_json($jobspec);
172
173   if (!$resume_stash)
174   {
175     map { croak ("No $_ specified") unless $Job->{$_} }
176     qw(script script_version script_parameters);
177   }
178
179   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
180   $Job->{'started_at'} = gmtime;
181
182   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183
184   $job_has_uuid = 1;
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 $local_logfile = File::Temp->new();
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271
272 my $jobmanager_id;
273 if ($job_has_uuid)
274 {
275   # Claim this job, and make sure nobody else does
276   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
277           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
278     croak("Error while updating / locking job");
279   }
280   $Job->update_attributes('started_at' => scalar gmtime,
281                           'running' => 1,
282                           'success' => undef,
283                           'tasks_summary' => { 'failed' => 0,
284                                                'todo' => 1,
285                                                'running' => 0,
286                                                'done' => 0 });
287 }
288
289
290 Log (undef, "start");
291 $SIG{'INT'} = sub { $main::please_freeze = 1; };
292 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
293 $SIG{'TERM'} = \&croak;
294 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
295 $SIG{'ALRM'} = sub { $main::please_info = 1; };
296 $SIG{'CONT'} = sub { $main::please_continue = 1; };
297 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298
299 $main::please_freeze = 0;
300 $main::please_info = 0;
301 $main::please_continue = 0;
302 $main::please_refresh = 0;
303 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
304
305 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
306 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
307 $ENV{"JOB_UUID"} = $job_id;
308
309
310 my @jobstep;
311 my @jobstep_todo = ();
312 my @jobstep_done = ();
313 my @jobstep_tomerge = ();
314 my $jobstep_tomerge_level = 0;
315 my $squeue_checked;
316 my $squeue_kill_checked;
317 my $output_in_keep = 0;
318 my $latest_refresh = scalar time;
319
320
321
322 if (defined $Job->{thawedfromkey})
323 {
324   thaw ($Job->{thawedfromkey});
325 }
326 else
327 {
328   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
329     'job_uuid' => $Job->{'uuid'},
330     'sequence' => 0,
331     'qsequence' => 0,
332     'parameters' => {},
333                                                           });
334   push @jobstep, { 'level' => 0,
335                    'failures' => 0,
336                    'arvados_task' => $first_task,
337                  };
338   push @jobstep_todo, 0;
339 }
340
341
342 if (!$have_slurm)
343 {
344   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
345 }
346
347
348 my $build_script;
349
350
351 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352
353 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
354 if ($skip_install)
355 {
356   if (!defined $no_clear_tmp) {
357     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
358     system($clear_tmp_cmd) == 0
359         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360   }
361   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
362   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363     if (-d $src_path) {
364       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
365           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
366       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367           == 0
368           or croak ("setup.py in $src_path failed: exit ".($?>>8));
369     }
370   }
371 }
372 else
373 {
374   do {
375     local $/ = undef;
376     $build_script = <DATA>;
377   };
378   Log (undef, "Install revision ".$Job->{script_version});
379   my $nodelist = join(",", @node);
380
381   if (!defined $no_clear_tmp) {
382     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383
384     my $cleanpid = fork();
385     if ($cleanpid == 0)
386     {
387       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
388             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
389       exit (1);
390     }
391     while (1)
392     {
393       last if $cleanpid == waitpid (-1, WNOHANG);
394       freeze_if_want_freeze ($cleanpid);
395       select (undef, undef, undef, 0.1);
396     }
397     Log (undef, "Clean-work-dir exited $?");
398   }
399
400   # Install requested code version
401
402   my @execargs;
403   my @srunargs = ("srun",
404                   "--nodelist=$nodelist",
405                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406
407   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
408   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
409
410   my $commit;
411   my $git_archive;
412   my $treeish = $Job->{'script_version'};
413
414   # If we're running under crunch-dispatch, it will have pulled the
415   # appropriate source tree into its own repository, and given us that
416   # repo's path as $git_dir. If we're running a "local" job, and a
417   # script_version was specified, it's up to the user to provide the
418   # full path to a local repository in Job->{repository}.
419   #
420   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
421   # git-archive --remote where appropriate.
422   #
423   # TODO: Accept a locally-hosted Arvados repository by name or
424   # UUID. Use arvados.v1.repositories.list or .get to figure out the
425   # appropriate fetch-url.
426   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427
428   $ENV{"CRUNCH_SRC_URL"} = $repo;
429
430   if (-d "$repo/.git") {
431     # We were given a working directory, but we are only interested in
432     # the index.
433     $repo = "$repo/.git";
434   }
435
436   # If this looks like a subversion r#, look for it in git-svn commit messages
437
438   if ($treeish =~ m{^\d{1,4}$}) {
439     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440     chomp $gitlog;
441     if ($gitlog =~ /^[a-f0-9]{40}$/) {
442       $commit = $gitlog;
443       Log (undef, "Using commit $commit for script_version $treeish");
444     }
445   }
446
447   # If that didn't work, try asking git to look it up as a tree-ish.
448
449   if (!defined $commit) {
450     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451     chomp $found;
452     if ($found =~ /^[0-9a-f]{40}$/s) {
453       $commit = $found;
454       if ($commit ne $treeish) {
455         # Make sure we record the real commit id in the database,
456         # frozentokey, logs, etc. -- instead of an abbreviation or a
457         # branch name which can become ambiguous or point to a
458         # different commit in the future.
459         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
460         Log (undef, "Using commit $commit for tree-ish $treeish");
461         if ($commit ne $treeish) {
462           $Job->{'script_version'} = $commit;
463           !$job_has_uuid or
464               $Job->update_attributes('script_version' => $commit) or
465               croak("Error while updating job");
466         }
467       }
468     }
469   }
470
471   if (defined $commit) {
472     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
473     @execargs = ("sh", "-c",
474                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
475     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
476   }
477   else {
478     croak ("could not figure out commit id for $treeish");
479   }
480
481   my $installpid = fork();
482   if ($installpid == 0)
483   {
484     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
485     exit (1);
486   }
487   while (1)
488   {
489     last if $installpid == waitpid (-1, WNOHANG);
490     freeze_if_want_freeze ($installpid);
491     select (undef, undef, undef, 0.1);
492   }
493   Log (undef, "Install exited $?");
494 }
495
496 if (!$have_slurm)
497 {
498   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
499   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
500 }
501
502 # If this job requires a Docker image, install that.
503 my $docker_bin = "/usr/bin/docker.io";
504 my ($docker_locator, $docker_hash);
505 if ($docker_locator = $Job->{docker_image_locator}) {
506   $docker_hash = find_docker_hash($docker_locator);
507   if (!$docker_hash)
508   {
509     croak("No Docker image hash found from locator $docker_locator");
510   }
511   my $docker_install_script = qq{
512 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
513     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
514 fi
515 };
516   my $docker_pid = fork();
517   if ($docker_pid == 0)
518   {
519     srun (["srun", "--nodelist=" . join(',', @node)],
520           ["/bin/sh", "-ec", $docker_install_script]);
521     exit ($?);
522   }
523   while (1)
524   {
525     last if $docker_pid == waitpid (-1, WNOHANG);
526     freeze_if_want_freeze ($docker_pid);
527     select (undef, undef, undef, 0.1);
528   }
529   if ($? != 0)
530   {
531     croak("Installing Docker image from $docker_locator returned exit code $?");
532   }
533 }
534
535 foreach (qw (script script_version script_parameters runtime_constraints))
536 {
537   Log (undef,
538        "$_ " .
539        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
540 }
541 foreach (split (/\n/, $Job->{knobs}))
542 {
543   Log (undef, "knob " . $_);
544 }
545
546
547
548 $main::success = undef;
549
550
551
552 ONELEVEL:
553
554 my $thisround_succeeded = 0;
555 my $thisround_failed = 0;
556 my $thisround_failed_multiple = 0;
557
558 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
559                        or $a <=> $b } @jobstep_todo;
560 my $level = $jobstep[$jobstep_todo[0]]->{level};
561 Log (undef, "start level $level");
562
563
564
565 my %proc;
566 my @freeslot = (0..$#slot);
567 my @holdslot;
568 my %reader;
569 my $progress_is_dirty = 1;
570 my $progress_stats_updated = 0;
571
572 update_progress_stats();
573
574
575
576 THISROUND:
577 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
578 {
579   my $id = $jobstep_todo[$todo_ptr];
580   my $Jobstep = $jobstep[$id];
581   if ($Jobstep->{level} != $level)
582   {
583     next;
584   }
585
586   pipe $reader{$id}, "writer" or croak ($!);
587   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
588   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
589
590   my $childslot = $freeslot[0];
591   my $childnode = $slot[$childslot]->{node};
592   my $childslotname = join (".",
593                             $slot[$childslot]->{node}->{name},
594                             $slot[$childslot]->{cpu});
595   my $childpid = fork();
596   if ($childpid == 0)
597   {
598     $SIG{'INT'} = 'DEFAULT';
599     $SIG{'QUIT'} = 'DEFAULT';
600     $SIG{'TERM'} = 'DEFAULT';
601
602     foreach (values (%reader))
603     {
604       close($_);
605     }
606     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
607     open(STDOUT,">&writer");
608     open(STDERR,">&writer");
609
610     undef $dbh;
611     undef $sth;
612
613     delete $ENV{"GNUPGHOME"};
614     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
615     $ENV{"TASK_QSEQUENCE"} = $id;
616     $ENV{"TASK_SEQUENCE"} = $level;
617     $ENV{"JOB_SCRIPT"} = $Job->{script};
618     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
619       $param =~ tr/a-z/A-Z/;
620       $ENV{"JOB_PARAMETER_$param"} = $value;
621     }
622     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
623     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
624     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
625     $ENV{"HOME"} = $ENV{"TASK_WORK"};
626     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
627     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
628     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
629     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
630
631     $ENV{"GZIP"} = "-n";
632
633     my @srunargs = (
634       "srun",
635       "--nodelist=".$childnode->{name},
636       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
637       "--job-name=$job_id.$id.$$",
638         );
639     my $build_script_to_send = "";
640     my $command =
641         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
642         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
643         ."&& cd $ENV{CRUNCH_TMP} ";
644     if ($build_script)
645     {
646       $build_script_to_send = $build_script;
647       $command .=
648           "&& perl -";
649     }
650     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
651     if ($docker_hash)
652     {
653       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
654       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
655       # Dynamically configure the container to use the host system as its
656       # DNS server.  Get the host's global addresses from the ip command,
657       # and turn them into docker --dns options using gawk.
658       $command .=
659           q{$(ip -o address show scope global |
660               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
661       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
662       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
663       $command .= "--env=\QHOME=/home/crunch\E ";
664       while (my ($env_key, $env_val) = each %ENV)
665       {
666         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
667           if ($env_key eq "TASK_WORK") {
668             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
669           }
670           elsif ($env_key eq "TASK_KEEPMOUNT") {
671             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
672           }
673           elsif ($env_key eq "CRUNCH_SRC") {
674             $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
675           }
676           else {
677             $command .= "--env=\Q$env_key=$env_val\E ";
678           }
679         }
680       }
681       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
682       $command .= "\Q$docker_hash\E ";
683       $command .= "stdbuf --output=0 --error=0 ";
684       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
685     } else {
686       # Non-docker run
687       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
688       $command .= "stdbuf --output=0 --error=0 ";
689       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
690     }
691
692     my @execargs = ('bash', '-c', $command);
693     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
694     exit (111);
695   }
696   close("writer");
697   if (!defined $childpid)
698   {
699     close $reader{$id};
700     delete $reader{$id};
701     next;
702   }
703   shift @freeslot;
704   $proc{$childpid} = { jobstep => $id,
705                        time => time,
706                        slot => $childslot,
707                        jobstepname => "$job_id.$id.$childpid",
708                      };
709   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
710   $slot[$childslot]->{pid} = $childpid;
711
712   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
713   Log ($id, "child $childpid started on $childslotname");
714   $Jobstep->{starttime} = time;
715   $Jobstep->{node} = $childnode->{name};
716   $Jobstep->{slotindex} = $childslot;
717   delete $Jobstep->{stderr};
718   delete $Jobstep->{finishtime};
719
720   splice @jobstep_todo, $todo_ptr, 1;
721   --$todo_ptr;
722
723   $progress_is_dirty = 1;
724
725   while (!@freeslot
726          ||
727          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
728   {
729     last THISROUND if $main::please_freeze;
730     if ($main::please_info)
731     {
732       $main::please_info = 0;
733       freeze();
734       collate_output();
735       save_meta(1);
736       update_progress_stats();
737     }
738     my $gotsome
739         = readfrompipes ()
740         + reapchildren ();
741     if (!$gotsome)
742     {
743       check_refresh_wanted();
744       check_squeue();
745       update_progress_stats();
746       select (undef, undef, undef, 0.1);
747     }
748     elsif (time - $progress_stats_updated >= 30)
749     {
750       update_progress_stats();
751     }
752     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
753         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
754     {
755       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
756           .($thisround_failed+$thisround_succeeded)
757           .") -- giving up on this round";
758       Log (undef, $message);
759       last THISROUND;
760     }
761
762     # move slots from freeslot to holdslot (or back to freeslot) if necessary
763     for (my $i=$#freeslot; $i>=0; $i--) {
764       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
765         push @holdslot, (splice @freeslot, $i, 1);
766       }
767     }
768     for (my $i=$#holdslot; $i>=0; $i--) {
769       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
770         push @freeslot, (splice @holdslot, $i, 1);
771       }
772     }
773
774     # give up if no nodes are succeeding
775     if (!grep { $_->{node}->{losing_streak} == 0 &&
776                     $_->{node}->{hold_count} < 4 } @slot) {
777       my $message = "Every node has failed -- giving up on this round";
778       Log (undef, $message);
779       last THISROUND;
780     }
781   }
782 }
783
784
785 push @freeslot, splice @holdslot;
786 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
787
788
789 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
790 while (%proc)
791 {
792   if ($main::please_continue) {
793     $main::please_continue = 0;
794     goto THISROUND;
795   }
796   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
797   readfrompipes ();
798   if (!reapchildren())
799   {
800     check_refresh_wanted();
801     check_squeue();
802     update_progress_stats();
803     select (undef, undef, undef, 0.1);
804     killem (keys %proc) if $main::please_freeze;
805   }
806 }
807
808 update_progress_stats();
809 freeze_if_want_freeze();
810
811
812 if (!defined $main::success)
813 {
814   if (@jobstep_todo &&
815       $thisround_succeeded == 0 &&
816       ($thisround_failed == 0 || $thisround_failed > 4))
817   {
818     my $message = "stop because $thisround_failed tasks failed and none succeeded";
819     Log (undef, $message);
820     $main::success = 0;
821   }
822   if (!@jobstep_todo)
823   {
824     $main::success = 1;
825   }
826 }
827
828 goto ONELEVEL if !defined $main::success;
829
830
831 release_allocation();
832 freeze();
833 my $collated_output = &collate_output();
834
835 if ($job_has_uuid) {
836   $Job->update_attributes('running' => 0,
837                           'success' => $collated_output && $main::success,
838                           'finished_at' => scalar gmtime)
839 }
840
841 if ($collated_output)
842 {
843   eval {
844     open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
845         or die "failed to get collated manifest: $!";
846     # Read the original manifest, and strip permission hints from it,
847     # so we can put the result in a Collection.
848     my @stripped_manifest_lines = ();
849     my $orig_manifest_text = '';
850     while (my $manifest_line = <$orig_manifest>) {
851       $orig_manifest_text .= $manifest_line;
852       my @words = split(/ /, $manifest_line, -1);
853       foreach my $ii (0..$#words) {
854         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
855           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
856         }
857       }
858       push(@stripped_manifest_lines, join(" ", @words));
859     }
860     my $stripped_manifest_text = join("", @stripped_manifest_lines);
861     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
862       'uuid' => md5_hex($stripped_manifest_text),
863       'manifest_text' => $orig_manifest_text,
864     });
865     $Job->update_attributes('output' => $output->{uuid});
866     if ($Job->{'output_is_persistent'}) {
867       $arv->{'links'}->{'create'}->execute('link' => {
868         'tail_kind' => 'arvados#user',
869         'tail_uuid' => $User->{'uuid'},
870         'head_kind' => 'arvados#collection',
871         'head_uuid' => $Job->{'output'},
872         'link_class' => 'resources',
873         'name' => 'wants',
874       });
875     }
876   };
877   if ($@) {
878     Log (undef, "Failed to register output manifest: $@");
879   }
880 }
881
882 Log (undef, "finish");
883
884 save_meta();
885 exit 0;
886
887
888
889 sub update_progress_stats
890 {
891   $progress_stats_updated = time;
892   return if !$progress_is_dirty;
893   my ($todo, $done, $running) = (scalar @jobstep_todo,
894                                  scalar @jobstep_done,
895                                  scalar @slot - scalar @freeslot - scalar @holdslot);
896   $Job->{'tasks_summary'} ||= {};
897   $Job->{'tasks_summary'}->{'todo'} = $todo;
898   $Job->{'tasks_summary'}->{'done'} = $done;
899   $Job->{'tasks_summary'}->{'running'} = $running;
900   if ($job_has_uuid) {
901     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
902   }
903   Log (undef, "status: $done done, $running running, $todo todo");
904   $progress_is_dirty = 0;
905 }
906
907
908
909 sub reapchildren
910 {
911   my $pid = waitpid (-1, WNOHANG);
912   return 0 if $pid <= 0;
913
914   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
915                   . "."
916                   . $slot[$proc{$pid}->{slot}]->{cpu});
917   my $jobstepid = $proc{$pid}->{jobstep};
918   my $elapsed = time - $proc{$pid}->{time};
919   my $Jobstep = $jobstep[$jobstepid];
920
921   my $childstatus = $?;
922   my $exitvalue = $childstatus >> 8;
923   my $exitinfo = sprintf("exit %d signal %d%s",
924                          $exitvalue,
925                          $childstatus & 127,
926                          ($childstatus & 128 ? ' core dump' : ''));
927   $Jobstep->{'arvados_task'}->reload;
928   my $task_success = $Jobstep->{'arvados_task'}->{success};
929
930   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
931
932   if (!defined $task_success) {
933     # task did not indicate one way or the other --> fail
934     $Jobstep->{'arvados_task'}->{success} = 0;
935     $Jobstep->{'arvados_task'}->save;
936     $task_success = 0;
937   }
938
939   if (!$task_success)
940   {
941     my $temporary_fail;
942     $temporary_fail ||= $Jobstep->{node_fail};
943     $temporary_fail ||= ($exitvalue == 111);
944
945     ++$thisround_failed;
946     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
947
948     # Check for signs of a failed or misconfigured node
949     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
950         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
951       # Don't count this against jobstep failure thresholds if this
952       # node is already suspected faulty and srun exited quickly
953       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
954           $elapsed < 5) {
955         Log ($jobstepid, "blaming failure on suspect node " .
956              $slot[$proc{$pid}->{slot}]->{node}->{name});
957         $temporary_fail ||= 1;
958       }
959       ban_node_by_slot($proc{$pid}->{slot});
960     }
961
962     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
963                              ++$Jobstep->{'failures'},
964                              $temporary_fail ? 'temporary ' : 'permanent',
965                              $elapsed));
966
967     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
968       # Give up on this task, and the whole job
969       $main::success = 0;
970       $main::please_freeze = 1;
971     }
972     else {
973       # Put this task back on the todo queue
974       push @jobstep_todo, $jobstepid;
975     }
976     $Job->{'tasks_summary'}->{'failed'}++;
977   }
978   else
979   {
980     ++$thisround_succeeded;
981     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
982     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
983     push @jobstep_done, $jobstepid;
984     Log ($jobstepid, "success in $elapsed seconds");
985   }
986   $Jobstep->{exitcode} = $childstatus;
987   $Jobstep->{finishtime} = time;
988   process_stderr ($jobstepid, $task_success);
989   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
990
991   close $reader{$jobstepid};
992   delete $reader{$jobstepid};
993   delete $slot[$proc{$pid}->{slot}]->{pid};
994   push @freeslot, $proc{$pid}->{slot};
995   delete $proc{$pid};
996
997   # Load new tasks
998   my $newtask_list = [];
999   my $newtask_results;
1000   do {
1001     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1002       'where' => {
1003         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1004       },
1005       'order' => 'qsequence',
1006       'offset' => scalar(@$newtask_list),
1007     );
1008     push(@$newtask_list, @{$newtask_results->{items}});
1009   } while (@{$newtask_results->{items}});
1010   foreach my $arvados_task (@$newtask_list) {
1011     my $jobstep = {
1012       'level' => $arvados_task->{'sequence'},
1013       'failures' => 0,
1014       'arvados_task' => $arvados_task
1015     };
1016     push @jobstep, $jobstep;
1017     push @jobstep_todo, $#jobstep;
1018   }
1019
1020   $progress_is_dirty = 1;
1021   1;
1022 }
1023
1024 sub check_refresh_wanted
1025 {
1026   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1027   if (@stat && $stat[9] > $latest_refresh) {
1028     $latest_refresh = scalar time;
1029     if ($job_has_uuid) {
1030       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1031       for my $attr ('cancelled_at',
1032                     'cancelled_by_user_uuid',
1033                     'cancelled_by_client_uuid') {
1034         $Job->{$attr} = $Job2->{$attr};
1035       }
1036       if ($Job->{'cancelled_at'}) {
1037         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1038              " by user " . $Job->{cancelled_by_user_uuid});
1039         $main::success = 0;
1040         $main::please_freeze = 1;
1041       }
1042     }
1043   }
1044 }
1045
1046 sub check_squeue
1047 {
1048   # return if the kill list was checked <4 seconds ago
1049   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1050   {
1051     return;
1052   }
1053   $squeue_kill_checked = time;
1054
1055   # use killem() on procs whose killtime is reached
1056   for (keys %proc)
1057   {
1058     if (exists $proc{$_}->{killtime}
1059         && $proc{$_}->{killtime} <= time)
1060     {
1061       killem ($_);
1062     }
1063   }
1064
1065   # return if the squeue was checked <60 seconds ago
1066   if (defined $squeue_checked && $squeue_checked > time - 60)
1067   {
1068     return;
1069   }
1070   $squeue_checked = time;
1071
1072   if (!$have_slurm)
1073   {
1074     # here is an opportunity to check for mysterious problems with local procs
1075     return;
1076   }
1077
1078   # get a list of steps still running
1079   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1080   chop @squeue;
1081   if ($squeue[-1] ne "ok")
1082   {
1083     return;
1084   }
1085   pop @squeue;
1086
1087   # which of my jobsteps are running, according to squeue?
1088   my %ok;
1089   foreach (@squeue)
1090   {
1091     if (/^(\d+)\.(\d+) (\S+)/)
1092     {
1093       if ($1 eq $ENV{SLURM_JOBID})
1094       {
1095         $ok{$3} = 1;
1096       }
1097     }
1098   }
1099
1100   # which of my active child procs (>60s old) were not mentioned by squeue?
1101   foreach (keys %proc)
1102   {
1103     if ($proc{$_}->{time} < time - 60
1104         && !exists $ok{$proc{$_}->{jobstepname}}
1105         && !exists $proc{$_}->{killtime})
1106     {
1107       # kill this proc if it hasn't exited in 30 seconds
1108       $proc{$_}->{killtime} = time + 30;
1109     }
1110   }
1111 }
1112
1113
1114 sub release_allocation
1115 {
1116   if ($have_slurm)
1117   {
1118     Log (undef, "release job allocation");
1119     system "scancel $ENV{SLURM_JOBID}";
1120   }
1121 }
1122
1123
1124 sub readfrompipes
1125 {
1126   my $gotsome = 0;
1127   foreach my $job (keys %reader)
1128   {
1129     my $buf;
1130     while (0 < sysread ($reader{$job}, $buf, 8192))
1131     {
1132       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1133       $jobstep[$job]->{stderr} .= $buf;
1134       preprocess_stderr ($job);
1135       if (length ($jobstep[$job]->{stderr}) > 16384)
1136       {
1137         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1138       }
1139       $gotsome = 1;
1140     }
1141   }
1142   return $gotsome;
1143 }
1144
1145
1146 sub preprocess_stderr
1147 {
1148   my $job = shift;
1149
1150   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1151     my $line = $1;
1152     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1153     Log ($job, "stderr $line");
1154     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1155       # whoa.
1156       $main::please_freeze = 1;
1157     }
1158     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1159       $jobstep[$job]->{node_fail} = 1;
1160       ban_node_by_slot($jobstep[$job]->{slotindex});
1161     }
1162   }
1163 }
1164
1165
1166 sub process_stderr
1167 {
1168   my $job = shift;
1169   my $task_success = shift;
1170   preprocess_stderr ($job);
1171
1172   map {
1173     Log ($job, "stderr $_");
1174   } split ("\n", $jobstep[$job]->{stderr});
1175 }
1176
1177 sub fetch_block
1178 {
1179   my $hash = shift;
1180   my ($keep, $child_out, $output_block);
1181
1182   my $cmd = "$arv_cli keep get \Q$hash\E";
1183   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1184   sysread($keep, $output_block, 64 * 1024 * 1024);
1185   close $keep;
1186   return $output_block;
1187 }
1188
1189 sub collate_output
1190 {
1191   Log (undef, "collate");
1192
1193   my ($child_out, $child_in);
1194   my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1195   my $joboutput;
1196   for (@jobstep)
1197   {
1198     next if (!exists $_->{'arvados_task'}->{output} ||
1199              !$_->{'arvados_task'}->{'success'} ||
1200              $_->{'exitcode'} != 0);
1201     my $output = $_->{'arvados_task'}->{output};
1202     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1203     {
1204       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1205       print $child_in $output;
1206     }
1207     elsif (@jobstep == 1)
1208     {
1209       $joboutput = $output;
1210       last;
1211     }
1212     elsif (defined (my $outblock = fetch_block ($output)))
1213     {
1214       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1215       print $child_in $outblock;
1216     }
1217     else
1218     {
1219       Log (undef, "XXX fetch_block($output) failed XXX");
1220       $main::success = 0;
1221     }
1222   }
1223   $child_in->close;
1224
1225   if (!defined $joboutput) {
1226     my $s = IO::Select->new($child_out);
1227     if ($s->can_read(120)) {
1228       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1229       chomp($joboutput);
1230     } else {
1231       Log (undef, "timed out reading from 'arv keep put'");
1232     }
1233   }
1234   waitpid($pid, 0);
1235
1236   if ($joboutput)
1237   {
1238     Log (undef, "output $joboutput");
1239     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1240   }
1241   else
1242   {
1243     Log (undef, "output undef");
1244   }
1245   return $joboutput;
1246 }
1247
1248
1249 sub killem
1250 {
1251   foreach (@_)
1252   {
1253     my $sig = 2;                # SIGINT first
1254     if (exists $proc{$_}->{"sent_$sig"} &&
1255         time - $proc{$_}->{"sent_$sig"} > 4)
1256     {
1257       $sig = 15;                # SIGTERM if SIGINT doesn't work
1258     }
1259     if (exists $proc{$_}->{"sent_$sig"} &&
1260         time - $proc{$_}->{"sent_$sig"} > 4)
1261     {
1262       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1263     }
1264     if (!exists $proc{$_}->{"sent_$sig"})
1265     {
1266       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1267       kill $sig, $_;
1268       select (undef, undef, undef, 0.1);
1269       if ($sig == 2)
1270       {
1271         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1272       }
1273       $proc{$_}->{"sent_$sig"} = time;
1274       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1275     }
1276   }
1277 }
1278
1279
1280 sub fhbits
1281 {
1282   my($bits);
1283   for (@_) {
1284     vec($bits,fileno($_),1) = 1;
1285   }
1286   $bits;
1287 }
1288
1289
1290 sub Log                         # ($jobstep_id, $logmessage)
1291 {
1292   if ($_[1] =~ /\n/) {
1293     for my $line (split (/\n/, $_[1])) {
1294       Log ($_[0], $line);
1295     }
1296     return;
1297   }
1298   my $fh = select STDERR; $|=1; select $fh;
1299   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1300   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1301   $message .= "\n";
1302   my $datetime;
1303   if ($local_logfile || -t STDERR) {
1304     my @gmtime = gmtime;
1305     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1306                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1307   }
1308   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1309
1310   if ($local_logfile) {
1311     print $local_logfile $datetime . " " . $message;
1312   }
1313 }
1314
1315
1316 sub croak
1317 {
1318   my ($package, $file, $line) = caller;
1319   my $message = "@_ at $file line $line\n";
1320   Log (undef, $message);
1321   freeze() if @jobstep_todo;
1322   collate_output() if @jobstep_todo;
1323   cleanup();
1324   save_meta() if $local_logfile;
1325   die;
1326 }
1327
1328
1329 sub cleanup
1330 {
1331   return if !$job_has_uuid;
1332   $Job->update_attributes('running' => 0,
1333                           'success' => 0,
1334                           'finished_at' => scalar gmtime);
1335 }
1336
1337
1338 sub save_meta
1339 {
1340   my $justcheckpoint = shift; # false if this will be the last meta saved
1341   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1342
1343   $local_logfile->flush;
1344   my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1345       . quotemeta($local_logfile->filename);
1346   my $loglocator = `$cmd`;
1347   die "system $cmd failed: $?" if $?;
1348   chomp($loglocator);
1349
1350   $local_logfile = undef;   # the temp file is automatically deleted
1351   Log (undef, "log manifest is $loglocator");
1352   $Job->{'log'} = $loglocator;
1353   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1354 }
1355
1356
1357 sub freeze_if_want_freeze
1358 {
1359   if ($main::please_freeze)
1360   {
1361     release_allocation();
1362     if (@_)
1363     {
1364       # kill some srun procs before freeze+stop
1365       map { $proc{$_} = {} } @_;
1366       while (%proc)
1367       {
1368         killem (keys %proc);
1369         select (undef, undef, undef, 0.1);
1370         my $died;
1371         while (($died = waitpid (-1, WNOHANG)) > 0)
1372         {
1373           delete $proc{$died};
1374         }
1375       }
1376     }
1377     freeze();
1378     collate_output();
1379     cleanup();
1380     save_meta();
1381     exit 0;
1382   }
1383 }
1384
1385
1386 sub freeze
1387 {
1388   Log (undef, "Freeze not implemented");
1389   return;
1390 }
1391
1392
1393 sub thaw
1394 {
1395   croak ("Thaw not implemented");
1396 }
1397
1398
1399 sub freezequote
1400 {
1401   my $s = shift;
1402   $s =~ s/\\/\\\\/g;
1403   $s =~ s/\n/\\n/g;
1404   return $s;
1405 }
1406
1407
1408 sub freezeunquote
1409 {
1410   my $s = shift;
1411   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1412   return $s;
1413 }
1414
1415
1416 sub srun
1417 {
1418   my $srunargs = shift;
1419   my $execargs = shift;
1420   my $opts = shift || {};
1421   my $stdin = shift;
1422   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1423   print STDERR (join (" ",
1424                       map { / / ? "'$_'" : $_ }
1425                       (@$args)),
1426                 "\n")
1427       if $ENV{CRUNCH_DEBUG};
1428
1429   if (defined $stdin) {
1430     my $child = open STDIN, "-|";
1431     defined $child or die "no fork: $!";
1432     if ($child == 0) {
1433       print $stdin or die $!;
1434       close STDOUT or die $!;
1435       exit 0;
1436     }
1437   }
1438
1439   return system (@$args) if $opts->{fork};
1440
1441   exec @$args;
1442   warn "ENV size is ".length(join(" ",%ENV));
1443   die "exec failed: $!: @$args";
1444 }
1445
1446
1447 sub ban_node_by_slot {
1448   # Don't start any new jobsteps on this node for 60 seconds
1449   my $slotid = shift;
1450   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1451   $slot[$slotid]->{node}->{hold_count}++;
1452   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1453 }
1454
1455 sub must_lock_now
1456 {
1457   my ($lockfile, $error_message) = @_;
1458   open L, ">", $lockfile or croak("$lockfile: $!");
1459   if (!flock L, LOCK_EX|LOCK_NB) {
1460     croak("Can't lock $lockfile: $error_message\n");
1461   }
1462 }
1463
1464 sub find_docker_hash {
1465   # Given a Keep locator, search for a matching link to find the Docker hash
1466   # of the stored image.
1467   my $locator = shift;
1468   my $links_result = $arv->{links}->{list}->execute(
1469     filters => [["head_uuid", "=", $locator],
1470                 ["link_class", "=", "docker_image_hash"]],
1471     limit => 1);
1472   my $docker_hash;
1473   foreach my $link (@{$links_result->{items}}) {
1474     $docker_hash = lc($link->{name});
1475   }
1476   return $docker_hash;
1477 }
1478
1479 __DATA__
1480 #!/usr/bin/perl
1481
1482 # checkout-and-build
1483
1484 use Fcntl ':flock';
1485
1486 my $destdir = $ENV{"CRUNCH_SRC"};
1487 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1488 my $repo = $ENV{"CRUNCH_SRC_URL"};
1489
1490 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1491 flock L, LOCK_EX;
1492 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1493     exit 0;
1494 }
1495
1496 unlink "$destdir.commit";
1497 open STDOUT, ">", "$destdir.log";
1498 open STDERR, ">&STDOUT";
1499
1500 mkdir $destdir;
1501 my @git_archive_data = <DATA>;
1502 if (@git_archive_data) {
1503   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1504   print TARX @git_archive_data;
1505   if(!close(TARX)) {
1506     die "'tar -C $destdir -xf -' exited $?: $!";
1507   }
1508 }
1509
1510 my $pwd;
1511 chomp ($pwd = `pwd`);
1512 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1513 mkdir $install_dir;
1514
1515 for my $src_path ("$destdir/arvados/sdk/python") {
1516   if (-d $src_path) {
1517     shell_or_die ("virtualenv", $install_dir);
1518     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1519   }
1520 }
1521
1522 if (-e "$destdir/crunch_scripts/install") {
1523     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1524 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1525     # Old version
1526     shell_or_die ("./tests/autotests.sh", $install_dir);
1527 } elsif (-e "./install.sh") {
1528     shell_or_die ("./install.sh", $install_dir);
1529 }
1530
1531 if ($commit) {
1532     unlink "$destdir.commit.new";
1533     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1534     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1535 }
1536
1537 close L;
1538
1539 exit 0;
1540
1541 sub shell_or_die
1542 {
1543   if ($ENV{"DEBUG"}) {
1544     print STDERR "@_\n";
1545   }
1546   system (@_) == 0
1547       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1548 }
1549
1550 __DATA__