3220: Merge branch '3220-http-status-codes' closes #3220
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $force_unlock;
100 my $git_dir;
101 my $jobspec;
102 my $job_api_token;
103 my $no_clear_tmp;
104 my $resume_stash;
105 GetOptions('force-unlock' => \$force_unlock,
106            'git-dir=s' => \$git_dir,
107            'job=s' => \$jobspec,
108            'job-api-token=s' => \$job_api_token,
109            'no-clear-tmp' => \$no_clear_tmp,
110            'resume-stash=s' => \$resume_stash,
111     );
112
113 if (defined $job_api_token) {
114   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
115 }
116
117 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
118 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
119 my $local_job = !$job_has_uuid;
120
121
122 $SIG{'USR1'} = sub
123 {
124   $main::ENV{CRUNCH_DEBUG} = 1;
125 };
126 $SIG{'USR2'} = sub
127 {
128   $main::ENV{CRUNCH_DEBUG} = 0;
129 };
130
131
132
133 my $arv = Arvados->new('apiVersion' => 'v1');
134 my $local_logfile;
135
136 my $User = $arv->{'users'}->{'current'}->execute;
137
138 my $Job = {};
139 my $job_id;
140 my $dbh;
141 my $sth;
142 if ($job_has_uuid)
143 {
144   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
145   if (!$force_unlock) {
146     if ($Job->{'is_locked_by_uuid'}) {
147       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148     }
149     if ($Job->{'success'} ne undef) {
150       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151     }
152     if ($Job->{'running'}) {
153       croak("Job 'running' flag is already set");
154     }
155     if ($Job->{'started_at'}) {
156       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
157     }
158   }
159 }
160 else
161 {
162   $Job = JSON::decode_json($jobspec);
163
164   if (!$resume_stash)
165   {
166     map { croak ("No $_ specified") unless $Job->{$_} }
167     qw(script script_version script_parameters);
168   }
169
170   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
171   $Job->{'started_at'} = gmtime;
172
173   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
174
175   $job_has_uuid = 1;
176 }
177 $job_id = $Job->{'uuid'};
178
179 my $keep_logfile = $job_id . '.log.txt';
180 $local_logfile = File::Temp->new();
181
182 $Job->{'runtime_constraints'} ||= {};
183 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
184 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
185
186
187 Log (undef, "check slurm allocation");
188 my @slot;
189 my @node;
190 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
191 my @sinfo;
192 if (!$have_slurm)
193 {
194   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
195   push @sinfo, "$localcpus localhost";
196 }
197 if (exists $ENV{SLURM_NODELIST})
198 {
199   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
200 }
201 foreach (@sinfo)
202 {
203   my ($ncpus, $slurm_nodelist) = split;
204   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
205
206   my @nodelist;
207   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
208   {
209     my $nodelist = $1;
210     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
211     {
212       my $ranges = $1;
213       foreach (split (",", $ranges))
214       {
215         my ($a, $b);
216         if (/(\d+)-(\d+)/)
217         {
218           $a = $1;
219           $b = $2;
220         }
221         else
222         {
223           $a = $_;
224           $b = $_;
225         }
226         push @nodelist, map {
227           my $n = $nodelist;
228           $n =~ s/\[[-,\d]+\]/$_/;
229           $n;
230         } ($a..$b);
231       }
232     }
233     else
234     {
235       push @nodelist, $nodelist;
236     }
237   }
238   foreach my $nodename (@nodelist)
239   {
240     Log (undef, "node $nodename - $ncpus slots");
241     my $node = { name => $nodename,
242                  ncpus => $ncpus,
243                  losing_streak => 0,
244                  hold_until => 0 };
245     foreach my $cpu (1..$ncpus)
246     {
247       push @slot, { node => $node,
248                     cpu => $cpu };
249     }
250   }
251   push @node, @nodelist;
252 }
253
254
255
256 # Ensure that we get one jobstep running on each allocated node before
257 # we start overloading nodes with concurrent steps
258
259 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
260
261
262
263 my $jobmanager_id;
264 if ($job_has_uuid)
265 {
266   # Claim this job, and make sure nobody else does
267   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
268           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269     croak("Error while updating / locking job");
270   }
271   $Job->update_attributes('started_at' => scalar gmtime,
272                           'running' => 1,
273                           'success' => undef,
274                           'tasks_summary' => { 'failed' => 0,
275                                                'todo' => 1,
276                                                'running' => 0,
277                                                'done' => 0 });
278 }
279
280
281 Log (undef, "start");
282 $SIG{'INT'} = sub { $main::please_freeze = 1; };
283 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
284 $SIG{'TERM'} = \&croak;
285 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
286 $SIG{'ALRM'} = sub { $main::please_info = 1; };
287 $SIG{'CONT'} = sub { $main::please_continue = 1; };
288 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
289
290 $main::please_freeze = 0;
291 $main::please_info = 0;
292 $main::please_continue = 0;
293 $main::please_refresh = 0;
294 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
295
296 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
297 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
298 $ENV{"JOB_UUID"} = $job_id;
299
300
301 my @jobstep;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
310
311
312
313 if (defined $Job->{thawedfromkey})
314 {
315   thaw ($Job->{thawedfromkey});
316 }
317 else
318 {
319   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
320     'job_uuid' => $Job->{'uuid'},
321     'sequence' => 0,
322     'qsequence' => 0,
323     'parameters' => {},
324                                                           });
325   push @jobstep, { 'level' => 0,
326                    'failures' => 0,
327                    'arvados_task' => $first_task,
328                  };
329   push @jobstep_todo, 0;
330 }
331
332
333 if (!$have_slurm)
334 {
335   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 }
337
338
339 my $build_script;
340
341
342 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
343
344 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
345 if ($skip_install)
346 {
347   if (!defined $no_clear_tmp) {
348     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
349     system($clear_tmp_cmd) == 0
350         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
351   }
352   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
353   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
354     if (-d $src_path) {
355       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
356           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
357       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
358           == 0
359           or croak ("setup.py in $src_path failed: exit ".($?>>8));
360     }
361   }
362 }
363 else
364 {
365   do {
366     local $/ = undef;
367     $build_script = <DATA>;
368   };
369   Log (undef, "Install revision ".$Job->{script_version});
370   my $nodelist = join(",", @node);
371
372   if (!defined $no_clear_tmp) {
373     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
374
375     my $cleanpid = fork();
376     if ($cleanpid == 0)
377     {
378       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
379             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
380       exit (1);
381     }
382     while (1)
383     {
384       last if $cleanpid == waitpid (-1, WNOHANG);
385       freeze_if_want_freeze ($cleanpid);
386       select (undef, undef, undef, 0.1);
387     }
388     Log (undef, "Clean-work-dir exited $?");
389   }
390
391   # Install requested code version
392
393   my @execargs;
394   my @srunargs = ("srun",
395                   "--nodelist=$nodelist",
396                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
397
398   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
399   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
400
401   my $commit;
402   my $git_archive;
403   my $treeish = $Job->{'script_version'};
404
405   # If we're running under crunch-dispatch, it will have pulled the
406   # appropriate source tree into its own repository, and given us that
407   # repo's path as $git_dir. If we're running a "local" job, and a
408   # script_version was specified, it's up to the user to provide the
409   # full path to a local repository in Job->{repository}.
410   #
411   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
412   # git-archive --remote where appropriate.
413   #
414   # TODO: Accept a locally-hosted Arvados repository by name or
415   # UUID. Use arvados.v1.repositories.list or .get to figure out the
416   # appropriate fetch-url.
417   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
418
419   $ENV{"CRUNCH_SRC_URL"} = $repo;
420
421   if (-d "$repo/.git") {
422     # We were given a working directory, but we are only interested in
423     # the index.
424     $repo = "$repo/.git";
425   }
426
427   # If this looks like a subversion r#, look for it in git-svn commit messages
428
429   if ($treeish =~ m{^\d{1,4}$}) {
430     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
431     chomp $gitlog;
432     if ($gitlog =~ /^[a-f0-9]{40}$/) {
433       $commit = $gitlog;
434       Log (undef, "Using commit $commit for script_version $treeish");
435     }
436   }
437
438   # If that didn't work, try asking git to look it up as a tree-ish.
439
440   if (!defined $commit) {
441     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
442     chomp $found;
443     if ($found =~ /^[0-9a-f]{40}$/s) {
444       $commit = $found;
445       if ($commit ne $treeish) {
446         # Make sure we record the real commit id in the database,
447         # frozentokey, logs, etc. -- instead of an abbreviation or a
448         # branch name which can become ambiguous or point to a
449         # different commit in the future.
450         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
451         Log (undef, "Using commit $commit for tree-ish $treeish");
452         if ($commit ne $treeish) {
453           $Job->{'script_version'} = $commit;
454           !$job_has_uuid or
455               $Job->update_attributes('script_version' => $commit) or
456               croak("Error while updating job");
457         }
458       }
459     }
460   }
461
462   if (defined $commit) {
463     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
464     @execargs = ("sh", "-c",
465                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
466     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
467   }
468   else {
469     croak ("could not figure out commit id for $treeish");
470   }
471
472   my $installpid = fork();
473   if ($installpid == 0)
474   {
475     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
476     exit (1);
477   }
478   while (1)
479   {
480     last if $installpid == waitpid (-1, WNOHANG);
481     freeze_if_want_freeze ($installpid);
482     select (undef, undef, undef, 0.1);
483   }
484   Log (undef, "Install exited $?");
485 }
486
487 if (!$have_slurm)
488 {
489   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
490   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
491 }
492
493 # If this job requires a Docker image, install that.
494 my $docker_bin = "/usr/bin/docker.io";
495 my ($docker_locator, $docker_hash);
496 if ($docker_locator = $Job->{docker_image_locator}) {
497   $docker_hash = find_docker_hash($docker_locator);
498   if (!$docker_hash)
499   {
500     croak("No Docker image hash found from locator $docker_locator");
501   }
502   my $docker_install_script = qq{
503 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
504     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
505 fi
506 };
507   my $docker_pid = fork();
508   if ($docker_pid == 0)
509   {
510     srun (["srun", "--nodelist=" . join(',', @node)],
511           ["/bin/sh", "-ec", $docker_install_script]);
512     exit ($?);
513   }
514   while (1)
515   {
516     last if $docker_pid == waitpid (-1, WNOHANG);
517     freeze_if_want_freeze ($docker_pid);
518     select (undef, undef, undef, 0.1);
519   }
520   if ($? != 0)
521   {
522     croak("Installing Docker image from $docker_locator returned exit code $?");
523   }
524 }
525
526 foreach (qw (script script_version script_parameters runtime_constraints))
527 {
528   Log (undef,
529        "$_ " .
530        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
531 }
532 foreach (split (/\n/, $Job->{knobs}))
533 {
534   Log (undef, "knob " . $_);
535 }
536
537
538
539 $main::success = undef;
540
541
542
543 ONELEVEL:
544
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
548
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550                        or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
553
554
555
556 my %proc;
557 my @freeslot = (0..$#slot);
558 my @holdslot;
559 my %reader;
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
562
563 update_progress_stats();
564
565
566
567 THISROUND:
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
569 {
570   my $id = $jobstep_todo[$todo_ptr];
571   my $Jobstep = $jobstep[$id];
572   if ($Jobstep->{level} != $level)
573   {
574     next;
575   }
576
577   pipe $reader{$id}, "writer" or croak ($!);
578   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
580
581   my $childslot = $freeslot[0];
582   my $childnode = $slot[$childslot]->{node};
583   my $childslotname = join (".",
584                             $slot[$childslot]->{node}->{name},
585                             $slot[$childslot]->{cpu});
586   my $childpid = fork();
587   if ($childpid == 0)
588   {
589     $SIG{'INT'} = 'DEFAULT';
590     $SIG{'QUIT'} = 'DEFAULT';
591     $SIG{'TERM'} = 'DEFAULT';
592
593     foreach (values (%reader))
594     {
595       close($_);
596     }
597     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598     open(STDOUT,">&writer");
599     open(STDERR,">&writer");
600
601     undef $dbh;
602     undef $sth;
603
604     delete $ENV{"GNUPGHOME"};
605     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606     $ENV{"TASK_QSEQUENCE"} = $id;
607     $ENV{"TASK_SEQUENCE"} = $level;
608     $ENV{"JOB_SCRIPT"} = $Job->{script};
609     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610       $param =~ tr/a-z/A-Z/;
611       $ENV{"JOB_PARAMETER_$param"} = $value;
612     }
613     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616     $ENV{"HOME"} = $ENV{"TASK_WORK"};
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_hash)
643     {
644       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
646       # Dynamically configure the container to use the host system as its
647       # DNS server.  Get the host's global addresses from the ip command,
648       # and turn them into docker --dns options using gawk.
649       $command .=
650           q{$(ip -o address show scope global |
651               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
653       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
654       $command .= "--env=\QHOME=/home/crunch\E ";
655       while (my ($env_key, $env_val) = each %ENV)
656       {
657         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
658           if ($env_key eq "TASK_WORK") {
659             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
660           }
661           elsif ($env_key eq "TASK_KEEPMOUNT") {
662             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
663           }
664           elsif ($env_key eq "CRUNCH_SRC") {
665             $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
666           }
667           else {
668             $command .= "--env=\Q$env_key=$env_val\E ";
669           }
670         }
671       }
672       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
673       $command .= "\Q$docker_hash\E ";
674       $command .= "stdbuf --output=0 --error=0 ";
675       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
676     } else {
677       # Non-docker run
678       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
679       $command .= "stdbuf --output=0 --error=0 ";
680       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
681     }
682
683     my @execargs = ('bash', '-c', $command);
684     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
685     exit (111);
686   }
687   close("writer");
688   if (!defined $childpid)
689   {
690     close $reader{$id};
691     delete $reader{$id};
692     next;
693   }
694   shift @freeslot;
695   $proc{$childpid} = { jobstep => $id,
696                        time => time,
697                        slot => $childslot,
698                        jobstepname => "$job_id.$id.$childpid",
699                      };
700   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
701   $slot[$childslot]->{pid} = $childpid;
702
703   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
704   Log ($id, "child $childpid started on $childslotname");
705   $Jobstep->{starttime} = time;
706   $Jobstep->{node} = $childnode->{name};
707   $Jobstep->{slotindex} = $childslot;
708   delete $Jobstep->{stderr};
709   delete $Jobstep->{finishtime};
710
711   splice @jobstep_todo, $todo_ptr, 1;
712   --$todo_ptr;
713
714   $progress_is_dirty = 1;
715
716   while (!@freeslot
717          ||
718          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
719   {
720     last THISROUND if $main::please_freeze;
721     if ($main::please_info)
722     {
723       $main::please_info = 0;
724       freeze();
725       collate_output();
726       save_meta(1);
727       update_progress_stats();
728     }
729     my $gotsome
730         = readfrompipes ()
731         + reapchildren ();
732     if (!$gotsome)
733     {
734       check_refresh_wanted();
735       check_squeue();
736       update_progress_stats();
737       select (undef, undef, undef, 0.1);
738     }
739     elsif (time - $progress_stats_updated >= 30)
740     {
741       update_progress_stats();
742     }
743     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
744         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
745     {
746       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
747           .($thisround_failed+$thisround_succeeded)
748           .") -- giving up on this round";
749       Log (undef, $message);
750       last THISROUND;
751     }
752
753     # move slots from freeslot to holdslot (or back to freeslot) if necessary
754     for (my $i=$#freeslot; $i>=0; $i--) {
755       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
756         push @holdslot, (splice @freeslot, $i, 1);
757       }
758     }
759     for (my $i=$#holdslot; $i>=0; $i--) {
760       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
761         push @freeslot, (splice @holdslot, $i, 1);
762       }
763     }
764
765     # give up if no nodes are succeeding
766     if (!grep { $_->{node}->{losing_streak} == 0 &&
767                     $_->{node}->{hold_count} < 4 } @slot) {
768       my $message = "Every node has failed -- giving up on this round";
769       Log (undef, $message);
770       last THISROUND;
771     }
772   }
773 }
774
775
776 push @freeslot, splice @holdslot;
777 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
778
779
780 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
781 while (%proc)
782 {
783   if ($main::please_continue) {
784     $main::please_continue = 0;
785     goto THISROUND;
786   }
787   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
788   readfrompipes ();
789   if (!reapchildren())
790   {
791     check_refresh_wanted();
792     check_squeue();
793     update_progress_stats();
794     select (undef, undef, undef, 0.1);
795     killem (keys %proc) if $main::please_freeze;
796   }
797 }
798
799 update_progress_stats();
800 freeze_if_want_freeze();
801
802
803 if (!defined $main::success)
804 {
805   if (@jobstep_todo &&
806       $thisround_succeeded == 0 &&
807       ($thisround_failed == 0 || $thisround_failed > 4))
808   {
809     my $message = "stop because $thisround_failed tasks failed and none succeeded";
810     Log (undef, $message);
811     $main::success = 0;
812   }
813   if (!@jobstep_todo)
814   {
815     $main::success = 1;
816   }
817 }
818
819 goto ONELEVEL if !defined $main::success;
820
821
822 release_allocation();
823 freeze();
824 my $collated_output = &collate_output();
825
826 if ($job_has_uuid) {
827   $Job->update_attributes('running' => 0,
828                           'success' => $collated_output && $main::success,
829                           'finished_at' => scalar gmtime)
830 }
831
832 if ($collated_output)
833 {
834   eval {
835     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
836         or die "failed to get collated manifest: $!";
837     # Read the original manifest, and strip permission hints from it,
838     # so we can put the result in a Collection.
839     my @stripped_manifest_lines = ();
840     my $orig_manifest_text = '';
841     while (my $manifest_line = <$orig_manifest>) {
842       $orig_manifest_text .= $manifest_line;
843       my @words = split(/ /, $manifest_line, -1);
844       foreach my $ii (0..$#words) {
845         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
846           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
847         }
848       }
849       push(@stripped_manifest_lines, join(" ", @words));
850     }
851     my $stripped_manifest_text = join("", @stripped_manifest_lines);
852     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
853       'uuid' => md5_hex($stripped_manifest_text),
854       'manifest_text' => $orig_manifest_text,
855     });
856     $Job->update_attributes('output' => $output->{uuid});
857     if ($Job->{'output_is_persistent'}) {
858       $arv->{'links'}->{'create'}->execute('link' => {
859         'tail_kind' => 'arvados#user',
860         'tail_uuid' => $User->{'uuid'},
861         'head_kind' => 'arvados#collection',
862         'head_uuid' => $Job->{'output'},
863         'link_class' => 'resources',
864         'name' => 'wants',
865       });
866     }
867   };
868   if ($@) {
869     Log (undef, "Failed to register output manifest: $@");
870   }
871 }
872
873 Log (undef, "finish");
874
875 save_meta();
876 exit 0;
877
878
879
880 sub update_progress_stats
881 {
882   $progress_stats_updated = time;
883   return if !$progress_is_dirty;
884   my ($todo, $done, $running) = (scalar @jobstep_todo,
885                                  scalar @jobstep_done,
886                                  scalar @slot - scalar @freeslot - scalar @holdslot);
887   $Job->{'tasks_summary'} ||= {};
888   $Job->{'tasks_summary'}->{'todo'} = $todo;
889   $Job->{'tasks_summary'}->{'done'} = $done;
890   $Job->{'tasks_summary'}->{'running'} = $running;
891   if ($job_has_uuid) {
892     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
893   }
894   Log (undef, "status: $done done, $running running, $todo todo");
895   $progress_is_dirty = 0;
896 }
897
898
899
900 sub reapchildren
901 {
902   my $pid = waitpid (-1, WNOHANG);
903   return 0 if $pid <= 0;
904
905   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
906                   . "."
907                   . $slot[$proc{$pid}->{slot}]->{cpu});
908   my $jobstepid = $proc{$pid}->{jobstep};
909   my $elapsed = time - $proc{$pid}->{time};
910   my $Jobstep = $jobstep[$jobstepid];
911
912   my $childstatus = $?;
913   my $exitvalue = $childstatus >> 8;
914   my $exitinfo = sprintf("exit %d signal %d%s",
915                          $exitvalue,
916                          $childstatus & 127,
917                          ($childstatus & 128 ? ' core dump' : ''));
918   $Jobstep->{'arvados_task'}->reload;
919   my $task_success = $Jobstep->{'arvados_task'}->{success};
920
921   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
922
923   if (!defined $task_success) {
924     # task did not indicate one way or the other --> fail
925     $Jobstep->{'arvados_task'}->{success} = 0;
926     $Jobstep->{'arvados_task'}->save;
927     $task_success = 0;
928   }
929
930   if (!$task_success)
931   {
932     my $temporary_fail;
933     $temporary_fail ||= $Jobstep->{node_fail};
934     $temporary_fail ||= ($exitvalue == 111);
935
936     ++$thisround_failed;
937     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
938
939     # Check for signs of a failed or misconfigured node
940     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
941         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
942       # Don't count this against jobstep failure thresholds if this
943       # node is already suspected faulty and srun exited quickly
944       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
945           $elapsed < 5) {
946         Log ($jobstepid, "blaming failure on suspect node " .
947              $slot[$proc{$pid}->{slot}]->{node}->{name});
948         $temporary_fail ||= 1;
949       }
950       ban_node_by_slot($proc{$pid}->{slot});
951     }
952
953     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
954                              ++$Jobstep->{'failures'},
955                              $temporary_fail ? 'temporary ' : 'permanent',
956                              $elapsed));
957
958     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
959       # Give up on this task, and the whole job
960       $main::success = 0;
961       $main::please_freeze = 1;
962     }
963     else {
964       # Put this task back on the todo queue
965       push @jobstep_todo, $jobstepid;
966     }
967     $Job->{'tasks_summary'}->{'failed'}++;
968   }
969   else
970   {
971     ++$thisround_succeeded;
972     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
973     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
974     push @jobstep_done, $jobstepid;
975     Log ($jobstepid, "success in $elapsed seconds");
976   }
977   $Jobstep->{exitcode} = $childstatus;
978   $Jobstep->{finishtime} = time;
979   process_stderr ($jobstepid, $task_success);
980   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
981
982   close $reader{$jobstepid};
983   delete $reader{$jobstepid};
984   delete $slot[$proc{$pid}->{slot}]->{pid};
985   push @freeslot, $proc{$pid}->{slot};
986   delete $proc{$pid};
987
988   if ($task_success) {
989     # Load new tasks
990     my $newtask_list = [];
991     my $newtask_results;
992     do {
993       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
994         'where' => {
995           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
996         },
997         'order' => 'qsequence',
998         'offset' => scalar(@$newtask_list),
999       );
1000       push(@$newtask_list, @{$newtask_results->{items}});
1001     } while (@{$newtask_results->{items}});
1002     foreach my $arvados_task (@$newtask_list) {
1003       my $jobstep = {
1004         'level' => $arvados_task->{'sequence'},
1005         'failures' => 0,
1006         'arvados_task' => $arvados_task
1007       };
1008       push @jobstep, $jobstep;
1009       push @jobstep_todo, $#jobstep;
1010     }
1011   }
1012
1013   $progress_is_dirty = 1;
1014   1;
1015 }
1016
1017 sub check_refresh_wanted
1018 {
1019   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1020   if (@stat && $stat[9] > $latest_refresh) {
1021     $latest_refresh = scalar time;
1022     if ($job_has_uuid) {
1023       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1024       for my $attr ('cancelled_at',
1025                     'cancelled_by_user_uuid',
1026                     'cancelled_by_client_uuid') {
1027         $Job->{$attr} = $Job2->{$attr};
1028       }
1029       if ($Job->{'cancelled_at'}) {
1030         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1031              " by user " . $Job->{cancelled_by_user_uuid});
1032         $main::success = 0;
1033         $main::please_freeze = 1;
1034       }
1035     }
1036   }
1037 }
1038
1039 sub check_squeue
1040 {
1041   # return if the kill list was checked <4 seconds ago
1042   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1043   {
1044     return;
1045   }
1046   $squeue_kill_checked = time;
1047
1048   # use killem() on procs whose killtime is reached
1049   for (keys %proc)
1050   {
1051     if (exists $proc{$_}->{killtime}
1052         && $proc{$_}->{killtime} <= time)
1053     {
1054       killem ($_);
1055     }
1056   }
1057
1058   # return if the squeue was checked <60 seconds ago
1059   if (defined $squeue_checked && $squeue_checked > time - 60)
1060   {
1061     return;
1062   }
1063   $squeue_checked = time;
1064
1065   if (!$have_slurm)
1066   {
1067     # here is an opportunity to check for mysterious problems with local procs
1068     return;
1069   }
1070
1071   # get a list of steps still running
1072   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1073   chop @squeue;
1074   if ($squeue[-1] ne "ok")
1075   {
1076     return;
1077   }
1078   pop @squeue;
1079
1080   # which of my jobsteps are running, according to squeue?
1081   my %ok;
1082   foreach (@squeue)
1083   {
1084     if (/^(\d+)\.(\d+) (\S+)/)
1085     {
1086       if ($1 eq $ENV{SLURM_JOBID})
1087       {
1088         $ok{$3} = 1;
1089       }
1090     }
1091   }
1092
1093   # which of my active child procs (>60s old) were not mentioned by squeue?
1094   foreach (keys %proc)
1095   {
1096     if ($proc{$_}->{time} < time - 60
1097         && !exists $ok{$proc{$_}->{jobstepname}}
1098         && !exists $proc{$_}->{killtime})
1099     {
1100       # kill this proc if it hasn't exited in 30 seconds
1101       $proc{$_}->{killtime} = time + 30;
1102     }
1103   }
1104 }
1105
1106
1107 sub release_allocation
1108 {
1109   if ($have_slurm)
1110   {
1111     Log (undef, "release job allocation");
1112     system "scancel $ENV{SLURM_JOBID}";
1113   }
1114 }
1115
1116
1117 sub readfrompipes
1118 {
1119   my $gotsome = 0;
1120   foreach my $job (keys %reader)
1121   {
1122     my $buf;
1123     while (0 < sysread ($reader{$job}, $buf, 8192))
1124     {
1125       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1126       $jobstep[$job]->{stderr} .= $buf;
1127       preprocess_stderr ($job);
1128       if (length ($jobstep[$job]->{stderr}) > 16384)
1129       {
1130         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1131       }
1132       $gotsome = 1;
1133     }
1134   }
1135   return $gotsome;
1136 }
1137
1138
1139 sub preprocess_stderr
1140 {
1141   my $job = shift;
1142
1143   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1144     my $line = $1;
1145     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1146     Log ($job, "stderr $line");
1147     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1148       # whoa.
1149       $main::please_freeze = 1;
1150     }
1151     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1152       $jobstep[$job]->{node_fail} = 1;
1153       ban_node_by_slot($jobstep[$job]->{slotindex});
1154     }
1155   }
1156 }
1157
1158
1159 sub process_stderr
1160 {
1161   my $job = shift;
1162   my $task_success = shift;
1163   preprocess_stderr ($job);
1164
1165   map {
1166     Log ($job, "stderr $_");
1167   } split ("\n", $jobstep[$job]->{stderr});
1168 }
1169
1170 sub fetch_block
1171 {
1172   my $hash = shift;
1173   my ($keep, $child_out, $output_block);
1174
1175   my $cmd = "arv-get \Q$hash\E";
1176   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1177   sysread($keep, $output_block, 64 * 1024 * 1024);
1178   close $keep;
1179   return $output_block;
1180 }
1181
1182 sub collate_output
1183 {
1184   Log (undef, "collate");
1185
1186   my ($child_out, $child_in);
1187   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1188   my $joboutput;
1189   for (@jobstep)
1190   {
1191     next if (!exists $_->{'arvados_task'}->{output} ||
1192              !$_->{'arvados_task'}->{'success'} ||
1193              $_->{'exitcode'} != 0);
1194     my $output = $_->{'arvados_task'}->{output};
1195     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1196     {
1197       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1198       print $child_in $output;
1199     }
1200     elsif (@jobstep == 1)
1201     {
1202       $joboutput = $output;
1203       last;
1204     }
1205     elsif (defined (my $outblock = fetch_block ($output)))
1206     {
1207       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1208       print $child_in $outblock;
1209     }
1210     else
1211     {
1212       Log (undef, "XXX fetch_block($output) failed XXX");
1213       $main::success = 0;
1214     }
1215   }
1216   $child_in->close;
1217
1218   if (!defined $joboutput) {
1219     my $s = IO::Select->new($child_out);
1220     if ($s->can_read(120)) {
1221       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1222       chomp($joboutput);
1223     } else {
1224       Log (undef, "timed out reading from 'arv-put'");
1225     }
1226   }
1227   waitpid($pid, 0);
1228
1229   if ($joboutput)
1230   {
1231     Log (undef, "output $joboutput");
1232     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1233   }
1234   else
1235   {
1236     Log (undef, "output undef");
1237   }
1238   return $joboutput;
1239 }
1240
1241
1242 sub killem
1243 {
1244   foreach (@_)
1245   {
1246     my $sig = 2;                # SIGINT first
1247     if (exists $proc{$_}->{"sent_$sig"} &&
1248         time - $proc{$_}->{"sent_$sig"} > 4)
1249     {
1250       $sig = 15;                # SIGTERM if SIGINT doesn't work
1251     }
1252     if (exists $proc{$_}->{"sent_$sig"} &&
1253         time - $proc{$_}->{"sent_$sig"} > 4)
1254     {
1255       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1256     }
1257     if (!exists $proc{$_}->{"sent_$sig"})
1258     {
1259       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1260       kill $sig, $_;
1261       select (undef, undef, undef, 0.1);
1262       if ($sig == 2)
1263       {
1264         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1265       }
1266       $proc{$_}->{"sent_$sig"} = time;
1267       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1268     }
1269   }
1270 }
1271
1272
1273 sub fhbits
1274 {
1275   my($bits);
1276   for (@_) {
1277     vec($bits,fileno($_),1) = 1;
1278   }
1279   $bits;
1280 }
1281
1282
1283 sub Log                         # ($jobstep_id, $logmessage)
1284 {
1285   if ($_[1] =~ /\n/) {
1286     for my $line (split (/\n/, $_[1])) {
1287       Log ($_[0], $line);
1288     }
1289     return;
1290   }
1291   my $fh = select STDERR; $|=1; select $fh;
1292   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1293   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1294   $message .= "\n";
1295   my $datetime;
1296   if ($local_logfile || -t STDERR) {
1297     my @gmtime = gmtime;
1298     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1299                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1300   }
1301   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1302
1303   if ($local_logfile) {
1304     print $local_logfile $datetime . " " . $message;
1305   }
1306 }
1307
1308
1309 sub croak
1310 {
1311   my ($package, $file, $line) = caller;
1312   my $message = "@_ at $file line $line\n";
1313   Log (undef, $message);
1314   freeze() if @jobstep_todo;
1315   collate_output() if @jobstep_todo;
1316   cleanup();
1317   save_meta() if $local_logfile;
1318   die;
1319 }
1320
1321
1322 sub cleanup
1323 {
1324   return if !$job_has_uuid;
1325   $Job->update_attributes('running' => 0,
1326                           'success' => 0,
1327                           'finished_at' => scalar gmtime);
1328 }
1329
1330
1331 sub save_meta
1332 {
1333   my $justcheckpoint = shift; # false if this will be the last meta saved
1334   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1335
1336   $local_logfile->flush;
1337   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1338       . quotemeta($local_logfile->filename);
1339   my $loglocator = `$cmd`;
1340   die "system $cmd failed: $?" if $?;
1341   chomp($loglocator);
1342
1343   $local_logfile = undef;   # the temp file is automatically deleted
1344   Log (undef, "log manifest is $loglocator");
1345   $Job->{'log'} = $loglocator;
1346   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1347 }
1348
1349
1350 sub freeze_if_want_freeze
1351 {
1352   if ($main::please_freeze)
1353   {
1354     release_allocation();
1355     if (@_)
1356     {
1357       # kill some srun procs before freeze+stop
1358       map { $proc{$_} = {} } @_;
1359       while (%proc)
1360       {
1361         killem (keys %proc);
1362         select (undef, undef, undef, 0.1);
1363         my $died;
1364         while (($died = waitpid (-1, WNOHANG)) > 0)
1365         {
1366           delete $proc{$died};
1367         }
1368       }
1369     }
1370     freeze();
1371     collate_output();
1372     cleanup();
1373     save_meta();
1374     exit 0;
1375   }
1376 }
1377
1378
1379 sub freeze
1380 {
1381   Log (undef, "Freeze not implemented");
1382   return;
1383 }
1384
1385
1386 sub thaw
1387 {
1388   croak ("Thaw not implemented");
1389 }
1390
1391
1392 sub freezequote
1393 {
1394   my $s = shift;
1395   $s =~ s/\\/\\\\/g;
1396   $s =~ s/\n/\\n/g;
1397   return $s;
1398 }
1399
1400
1401 sub freezeunquote
1402 {
1403   my $s = shift;
1404   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1405   return $s;
1406 }
1407
1408
1409 sub srun
1410 {
1411   my $srunargs = shift;
1412   my $execargs = shift;
1413   my $opts = shift || {};
1414   my $stdin = shift;
1415   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1416   print STDERR (join (" ",
1417                       map { / / ? "'$_'" : $_ }
1418                       (@$args)),
1419                 "\n")
1420       if $ENV{CRUNCH_DEBUG};
1421
1422   if (defined $stdin) {
1423     my $child = open STDIN, "-|";
1424     defined $child or die "no fork: $!";
1425     if ($child == 0) {
1426       print $stdin or die $!;
1427       close STDOUT or die $!;
1428       exit 0;
1429     }
1430   }
1431
1432   return system (@$args) if $opts->{fork};
1433
1434   exec @$args;
1435   warn "ENV size is ".length(join(" ",%ENV));
1436   die "exec failed: $!: @$args";
1437 }
1438
1439
1440 sub ban_node_by_slot {
1441   # Don't start any new jobsteps on this node for 60 seconds
1442   my $slotid = shift;
1443   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1444   $slot[$slotid]->{node}->{hold_count}++;
1445   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1446 }
1447
1448 sub must_lock_now
1449 {
1450   my ($lockfile, $error_message) = @_;
1451   open L, ">", $lockfile or croak("$lockfile: $!");
1452   if (!flock L, LOCK_EX|LOCK_NB) {
1453     croak("Can't lock $lockfile: $error_message\n");
1454   }
1455 }
1456
1457 sub find_docker_hash {
1458   # Given a Keep locator, search for a matching link to find the Docker hash
1459   # of the stored image.
1460   my $locator = shift;
1461   my $links_result = $arv->{links}->{list}->execute(
1462     filters => [["head_uuid", "=", $locator],
1463                 ["link_class", "=", "docker_image_hash"]],
1464     limit => 1);
1465   my $docker_hash;
1466   foreach my $link (@{$links_result->{items}}) {
1467     $docker_hash = lc($link->{name});
1468   }
1469   return $docker_hash;
1470 }
1471
1472 __DATA__
1473 #!/usr/bin/perl
1474
1475 # checkout-and-build
1476
1477 use Fcntl ':flock';
1478
1479 my $destdir = $ENV{"CRUNCH_SRC"};
1480 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1481 my $repo = $ENV{"CRUNCH_SRC_URL"};
1482
1483 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1484 flock L, LOCK_EX;
1485 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1486     exit 0;
1487 }
1488
1489 unlink "$destdir.commit";
1490 open STDOUT, ">", "$destdir.log";
1491 open STDERR, ">&STDOUT";
1492
1493 mkdir $destdir;
1494 my @git_archive_data = <DATA>;
1495 if (@git_archive_data) {
1496   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1497   print TARX @git_archive_data;
1498   if(!close(TARX)) {
1499     die "'tar -C $destdir -xf -' exited $?: $!";
1500   }
1501 }
1502
1503 my $pwd;
1504 chomp ($pwd = `pwd`);
1505 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1506 mkdir $install_dir;
1507
1508 for my $src_path ("$destdir/arvados/sdk/python") {
1509   if (-d $src_path) {
1510     shell_or_die ("virtualenv", $install_dir);
1511     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1512   }
1513 }
1514
1515 if (-e "$destdir/crunch_scripts/install") {
1516     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1517 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1518     # Old version
1519     shell_or_die ("./tests/autotests.sh", $install_dir);
1520 } elsif (-e "./install.sh") {
1521     shell_or_die ("./install.sh", $install_dir);
1522 }
1523
1524 if ($commit) {
1525     unlink "$destdir.commit.new";
1526     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1527     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1528 }
1529
1530 close L;
1531
1532 exit 0;
1533
1534 sub shell_or_die
1535 {
1536   if ($ENV{"DEBUG"}) {
1537     print STDERR "@_\n";
1538   }
1539   system (@_) == 0
1540       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1541 }
1542
1543 __DATA__