Merge branch 'master' into 2380-ssh-doc
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85
86 $ENV{"TMPDIR"} ||= "/tmp";
87 unless (defined $ENV{"CRUNCH_TMP"}) {
88   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
89   if ($ENV{"USER"} ne "crunch" && $< != 0) {
90     # use a tmp dir unique for my uid
91     $ENV{"CRUNCH_TMP"} .= "-$<";
92   }
93 }
94 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
95 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
96 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
97 mkdir ($ENV{"JOB_WORK"});
98
99 my $force_unlock;
100 my $git_dir;
101 my $jobspec;
102 my $job_api_token;
103 my $no_clear_tmp;
104 my $resume_stash;
105 GetOptions('force-unlock' => \$force_unlock,
106            'git-dir=s' => \$git_dir,
107            'job=s' => \$jobspec,
108            'job-api-token=s' => \$job_api_token,
109            'no-clear-tmp' => \$no_clear_tmp,
110            'resume-stash=s' => \$resume_stash,
111     );
112
113 if (defined $job_api_token) {
114   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
115 }
116
117 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
118 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
119 my $local_job = !$job_has_uuid;
120
121
122 $SIG{'USR1'} = sub
123 {
124   $main::ENV{CRUNCH_DEBUG} = 1;
125 };
126 $SIG{'USR2'} = sub
127 {
128   $main::ENV{CRUNCH_DEBUG} = 0;
129 };
130
131
132
133 my $arv = Arvados->new('apiVersion' => 'v1');
134 my $local_logfile;
135
136 my $User = $arv->{'users'}->{'current'}->execute;
137
138 my $Job = {};
139 my $job_id;
140 my $dbh;
141 my $sth;
142 if ($job_has_uuid)
143 {
144   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
145   if (!$force_unlock) {
146     if ($Job->{'is_locked_by_uuid'}) {
147       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
148     }
149     if ($Job->{'success'} ne undef) {
150       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
151     }
152     if ($Job->{'running'}) {
153       croak("Job 'running' flag is already set");
154     }
155     if ($Job->{'started_at'}) {
156       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
157     }
158   }
159 }
160 else
161 {
162   $Job = JSON::decode_json($jobspec);
163
164   if (!$resume_stash)
165   {
166     map { croak ("No $_ specified") unless $Job->{$_} }
167     qw(script script_version script_parameters);
168   }
169
170   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
171   $Job->{'started_at'} = gmtime;
172
173   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
174
175   $job_has_uuid = 1;
176 }
177 $job_id = $Job->{'uuid'};
178
179 my $keep_logfile = $job_id . '.log.txt';
180 $local_logfile = File::Temp->new();
181
182 $Job->{'runtime_constraints'} ||= {};
183 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
184 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
185
186
187 Log (undef, "check slurm allocation");
188 my @slot;
189 my @node;
190 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
191 my @sinfo;
192 if (!$have_slurm)
193 {
194   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
195   push @sinfo, "$localcpus localhost";
196 }
197 if (exists $ENV{SLURM_NODELIST})
198 {
199   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
200 }
201 foreach (@sinfo)
202 {
203   my ($ncpus, $slurm_nodelist) = split;
204   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
205
206   my @nodelist;
207   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
208   {
209     my $nodelist = $1;
210     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
211     {
212       my $ranges = $1;
213       foreach (split (",", $ranges))
214       {
215         my ($a, $b);
216         if (/(\d+)-(\d+)/)
217         {
218           $a = $1;
219           $b = $2;
220         }
221         else
222         {
223           $a = $_;
224           $b = $_;
225         }
226         push @nodelist, map {
227           my $n = $nodelist;
228           $n =~ s/\[[-,\d]+\]/$_/;
229           $n;
230         } ($a..$b);
231       }
232     }
233     else
234     {
235       push @nodelist, $nodelist;
236     }
237   }
238   foreach my $nodename (@nodelist)
239   {
240     Log (undef, "node $nodename - $ncpus slots");
241     my $node = { name => $nodename,
242                  ncpus => $ncpus,
243                  losing_streak => 0,
244                  hold_until => 0 };
245     foreach my $cpu (1..$ncpus)
246     {
247       push @slot, { node => $node,
248                     cpu => $cpu };
249     }
250   }
251   push @node, @nodelist;
252 }
253
254
255
256 # Ensure that we get one jobstep running on each allocated node before
257 # we start overloading nodes with concurrent steps
258
259 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
260
261
262
263 my $jobmanager_id;
264 if ($job_has_uuid)
265 {
266   # Claim this job, and make sure nobody else does
267   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
268           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269     croak("Error while updating / locking job");
270   }
271   $Job->update_attributes('started_at' => scalar gmtime,
272                           'running' => 1,
273                           'success' => undef,
274                           'tasks_summary' => { 'failed' => 0,
275                                                'todo' => 1,
276                                                'running' => 0,
277                                                'done' => 0 });
278 }
279
280
281 Log (undef, "start");
282 $SIG{'INT'} = sub { $main::please_freeze = 1; };
283 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
284 $SIG{'TERM'} = \&croak;
285 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
286 $SIG{'ALRM'} = sub { $main::please_info = 1; };
287 $SIG{'CONT'} = sub { $main::please_continue = 1; };
288 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
289
290 $main::please_freeze = 0;
291 $main::please_info = 0;
292 $main::please_continue = 0;
293 $main::please_refresh = 0;
294 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
295
296 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
297 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
298 $ENV{"JOB_UUID"} = $job_id;
299
300
301 my @jobstep;
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
310
311
312
313 if (defined $Job->{thawedfromkey})
314 {
315   thaw ($Job->{thawedfromkey});
316 }
317 else
318 {
319   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
320     'job_uuid' => $Job->{'uuid'},
321     'sequence' => 0,
322     'qsequence' => 0,
323     'parameters' => {},
324                                                           });
325   push @jobstep, { 'level' => 0,
326                    'failures' => 0,
327                    'arvados_task' => $first_task,
328                  };
329   push @jobstep_todo, 0;
330 }
331
332
333 if (!$have_slurm)
334 {
335   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 }
337
338
339 my $build_script;
340
341
342 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
343
344 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
345 if ($skip_install)
346 {
347   if (!defined $no_clear_tmp) {
348     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
349     system($clear_tmp_cmd) == 0
350         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
351   }
352   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
353   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
354     if (-d $src_path) {
355       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
356           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
357       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
358           == 0
359           or croak ("setup.py in $src_path failed: exit ".($?>>8));
360     }
361   }
362 }
363 else
364 {
365   do {
366     local $/ = undef;
367     $build_script = <DATA>;
368   };
369   Log (undef, "Install revision ".$Job->{script_version});
370   my $nodelist = join(",", @node);
371
372   if (!defined $no_clear_tmp) {
373     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
374
375     my $cleanpid = fork();
376     if ($cleanpid == 0)
377     {
378       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
379             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
380       exit (1);
381     }
382     while (1)
383     {
384       last if $cleanpid == waitpid (-1, WNOHANG);
385       freeze_if_want_freeze ($cleanpid);
386       select (undef, undef, undef, 0.1);
387     }
388     Log (undef, "Clean-work-dir exited $?");
389   }
390
391   # Install requested code version
392
393   my @execargs;
394   my @srunargs = ("srun",
395                   "--nodelist=$nodelist",
396                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
397
398   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
399   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
400
401   my $commit;
402   my $git_archive;
403   my $treeish = $Job->{'script_version'};
404
405   # If we're running under crunch-dispatch, it will have pulled the
406   # appropriate source tree into its own repository, and given us that
407   # repo's path as $git_dir. If we're running a "local" job, and a
408   # script_version was specified, it's up to the user to provide the
409   # full path to a local repository in Job->{repository}.
410   #
411   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
412   # git-archive --remote where appropriate.
413   #
414   # TODO: Accept a locally-hosted Arvados repository by name or
415   # UUID. Use arvados.v1.repositories.list or .get to figure out the
416   # appropriate fetch-url.
417   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
418
419   $ENV{"CRUNCH_SRC_URL"} = $repo;
420
421   if (-d "$repo/.git") {
422     # We were given a working directory, but we are only interested in
423     # the index.
424     $repo = "$repo/.git";
425   }
426
427   # If this looks like a subversion r#, look for it in git-svn commit messages
428
429   if ($treeish =~ m{^\d{1,4}$}) {
430     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
431     chomp $gitlog;
432     if ($gitlog =~ /^[a-f0-9]{40}$/) {
433       $commit = $gitlog;
434       Log (undef, "Using commit $commit for script_version $treeish");
435     }
436   }
437
438   # If that didn't work, try asking git to look it up as a tree-ish.
439
440   if (!defined $commit) {
441     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
442     chomp $found;
443     if ($found =~ /^[0-9a-f]{40}$/s) {
444       $commit = $found;
445       if ($commit ne $treeish) {
446         # Make sure we record the real commit id in the database,
447         # frozentokey, logs, etc. -- instead of an abbreviation or a
448         # branch name which can become ambiguous or point to a
449         # different commit in the future.
450         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
451         Log (undef, "Using commit $commit for tree-ish $treeish");
452         if ($commit ne $treeish) {
453           $Job->{'script_version'} = $commit;
454           !$job_has_uuid or
455               $Job->update_attributes('script_version' => $commit) or
456               croak("Error while updating job");
457         }
458       }
459     }
460   }
461
462   if (defined $commit) {
463     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
464     @execargs = ("sh", "-c",
465                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
466     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
467   }
468   else {
469     croak ("could not figure out commit id for $treeish");
470   }
471
472   my $installpid = fork();
473   if ($installpid == 0)
474   {
475     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
476     exit (1);
477   }
478   while (1)
479   {
480     last if $installpid == waitpid (-1, WNOHANG);
481     freeze_if_want_freeze ($installpid);
482     select (undef, undef, undef, 0.1);
483   }
484   Log (undef, "Install exited $?");
485 }
486
487 if (!$have_slurm)
488 {
489   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
490   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
491 }
492
493 # If this job requires a Docker image, install that.
494 my $docker_bin = "/usr/bin/docker.io";
495 my ($docker_locator, $docker_hash);
496 if ($docker_locator = $Job->{docker_image_locator}) {
497   $docker_hash = find_docker_hash($docker_locator);
498   if (!$docker_hash)
499   {
500     croak("No Docker image hash found from locator $docker_locator");
501   }
502   my $docker_install_script = qq{
503 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
504     arv-get \Q$docker_locator/$docker_hash.tar\E | $docker_bin load
505 fi
506 };
507   my $docker_pid = fork();
508   if ($docker_pid == 0)
509   {
510     srun (["srun", "--nodelist=" . join(',', @node)],
511           ["/bin/sh", "-ec", $docker_install_script]);
512     exit ($?);
513   }
514   while (1)
515   {
516     last if $docker_pid == waitpid (-1, WNOHANG);
517     freeze_if_want_freeze ($docker_pid);
518     select (undef, undef, undef, 0.1);
519   }
520   if ($? != 0)
521   {
522     croak("Installing Docker image from $docker_locator returned exit code $?");
523   }
524 }
525
526 foreach (qw (script script_version script_parameters runtime_constraints))
527 {
528   Log (undef,
529        "$_ " .
530        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
531 }
532 foreach (split (/\n/, $Job->{knobs}))
533 {
534   Log (undef, "knob " . $_);
535 }
536
537
538
539 $main::success = undef;
540
541
542
543 ONELEVEL:
544
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
548
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550                        or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
553
554
555
556 my %proc;
557 my @freeslot = (0..$#slot);
558 my @holdslot;
559 my %reader;
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
562
563 update_progress_stats();
564
565
566
567 THISROUND:
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
569 {
570   my $id = $jobstep_todo[$todo_ptr];
571   my $Jobstep = $jobstep[$id];
572   if ($Jobstep->{level} != $level)
573   {
574     next;
575   }
576
577   pipe $reader{$id}, "writer" or croak ($!);
578   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
580
581   my $childslot = $freeslot[0];
582   my $childnode = $slot[$childslot]->{node};
583   my $childslotname = join (".",
584                             $slot[$childslot]->{node}->{name},
585                             $slot[$childslot]->{cpu});
586   my $childpid = fork();
587   if ($childpid == 0)
588   {
589     $SIG{'INT'} = 'DEFAULT';
590     $SIG{'QUIT'} = 'DEFAULT';
591     $SIG{'TERM'} = 'DEFAULT';
592
593     foreach (values (%reader))
594     {
595       close($_);
596     }
597     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598     open(STDOUT,">&writer");
599     open(STDERR,">&writer");
600
601     undef $dbh;
602     undef $sth;
603
604     delete $ENV{"GNUPGHOME"};
605     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606     $ENV{"TASK_QSEQUENCE"} = $id;
607     $ENV{"TASK_SEQUENCE"} = $level;
608     $ENV{"JOB_SCRIPT"} = $Job->{script};
609     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610       $param =~ tr/a-z/A-Z/;
611       $ENV{"JOB_PARAMETER_$param"} = $value;
612     }
613     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616     $ENV{"HOME"} = $ENV{"TASK_WORK"};
617     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
618     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
619     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
620     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
621
622     $ENV{"GZIP"} = "-n";
623
624     my @srunargs = (
625       "srun",
626       "--nodelist=".$childnode->{name},
627       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
628       "--job-name=$job_id.$id.$$",
629         );
630     my $build_script_to_send = "";
631     my $command =
632         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
633         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT}"
634         ."&& cd $ENV{CRUNCH_TMP} ";
635     if ($build_script)
636     {
637       $build_script_to_send = $build_script;
638       $command .=
639           "&& perl -";
640     }
641     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
642     if ($docker_hash)
643     {
644       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
645       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
646       # Dynamically configure the container to use the host system as its
647       # DNS server.  Get the host's global addresses from the ip command,
648       # and turn them into docker --dns options using gawk.
649       $command .=
650           q{$(ip -o address show scope global |
651               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
652       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
653       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
654       $command .= "--env=\QHOME=/home/crunch\E ";
655       while (my ($env_key, $env_val) = each %ENV)
656       {
657         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
658           if ($env_key eq "TASK_WORK") {
659             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
660           }
661           elsif ($env_key eq "TASK_KEEPMOUNT") {
662             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
663           }
664           elsif ($env_key eq "CRUNCH_SRC") {
665             $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
666           }
667           else {
668             $command .= "--env=\Q$env_key=$env_val\E ";
669           }
670         }
671       }
672       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
673       $command .= "\Q$docker_hash\E ";
674       $command .= "stdbuf --output=0 --error=0 ";
675       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
676     } else {
677       # Non-docker run
678       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
679       $command .= "stdbuf --output=0 --error=0 ";
680       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
681     }
682
683     my @execargs = ('bash', '-c', $command);
684     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
685     exit (111);
686   }
687   close("writer");
688   if (!defined $childpid)
689   {
690     close $reader{$id};
691     delete $reader{$id};
692     next;
693   }
694   shift @freeslot;
695   $proc{$childpid} = { jobstep => $id,
696                        time => time,
697                        slot => $childslot,
698                        jobstepname => "$job_id.$id.$childpid",
699                      };
700   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
701   $slot[$childslot]->{pid} = $childpid;
702
703   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
704   Log ($id, "child $childpid started on $childslotname");
705   $Jobstep->{starttime} = time;
706   $Jobstep->{node} = $childnode->{name};
707   $Jobstep->{slotindex} = $childslot;
708   delete $Jobstep->{stderr};
709   delete $Jobstep->{finishtime};
710
711   splice @jobstep_todo, $todo_ptr, 1;
712   --$todo_ptr;
713
714   $progress_is_dirty = 1;
715
716   while (!@freeslot
717          ||
718          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
719   {
720     last THISROUND if $main::please_freeze;
721     if ($main::please_info)
722     {
723       $main::please_info = 0;
724       freeze();
725       collate_output();
726       save_meta(1);
727       update_progress_stats();
728     }
729     my $gotsome
730         = readfrompipes ()
731         + reapchildren ();
732     if (!$gotsome)
733     {
734       check_refresh_wanted();
735       check_squeue();
736       update_progress_stats();
737       select (undef, undef, undef, 0.1);
738     }
739     elsif (time - $progress_stats_updated >= 30)
740     {
741       update_progress_stats();
742     }
743     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
744         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
745     {
746       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
747           .($thisround_failed+$thisround_succeeded)
748           .") -- giving up on this round";
749       Log (undef, $message);
750       last THISROUND;
751     }
752
753     # move slots from freeslot to holdslot (or back to freeslot) if necessary
754     for (my $i=$#freeslot; $i>=0; $i--) {
755       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
756         push @holdslot, (splice @freeslot, $i, 1);
757       }
758     }
759     for (my $i=$#holdslot; $i>=0; $i--) {
760       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
761         push @freeslot, (splice @holdslot, $i, 1);
762       }
763     }
764
765     # give up if no nodes are succeeding
766     if (!grep { $_->{node}->{losing_streak} == 0 &&
767                     $_->{node}->{hold_count} < 4 } @slot) {
768       my $message = "Every node has failed -- giving up on this round";
769       Log (undef, $message);
770       last THISROUND;
771     }
772   }
773 }
774
775
776 push @freeslot, splice @holdslot;
777 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
778
779
780 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
781 while (%proc)
782 {
783   if ($main::please_continue) {
784     $main::please_continue = 0;
785     goto THISROUND;
786   }
787   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
788   readfrompipes ();
789   if (!reapchildren())
790   {
791     check_refresh_wanted();
792     check_squeue();
793     update_progress_stats();
794     select (undef, undef, undef, 0.1);
795     killem (keys %proc) if $main::please_freeze;
796   }
797 }
798
799 update_progress_stats();
800 freeze_if_want_freeze();
801
802
803 if (!defined $main::success)
804 {
805   if (@jobstep_todo &&
806       $thisround_succeeded == 0 &&
807       ($thisround_failed == 0 || $thisround_failed > 4))
808   {
809     my $message = "stop because $thisround_failed tasks failed and none succeeded";
810     Log (undef, $message);
811     $main::success = 0;
812   }
813   if (!@jobstep_todo)
814   {
815     $main::success = 1;
816   }
817 }
818
819 goto ONELEVEL if !defined $main::success;
820
821
822 release_allocation();
823 freeze();
824 my $collated_output = &collate_output();
825
826 if ($job_has_uuid) {
827   $Job->update_attributes('running' => 0,
828                           'success' => $collated_output && $main::success,
829                           'finished_at' => scalar gmtime)
830 }
831
832 if ($collated_output)
833 {
834   eval {
835     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
836         or die "failed to get collated manifest: $!";
837     # Read the original manifest, and strip permission hints from it,
838     # so we can put the result in a Collection.
839     my @stripped_manifest_lines = ();
840     my $orig_manifest_text = '';
841     while (my $manifest_line = <$orig_manifest>) {
842       $orig_manifest_text .= $manifest_line;
843       my @words = split(/ /, $manifest_line, -1);
844       foreach my $ii (0..$#words) {
845         if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
846           $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
847         }
848       }
849       push(@stripped_manifest_lines, join(" ", @words));
850     }
851     my $stripped_manifest_text = join("", @stripped_manifest_lines);
852     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
853       'uuid' => md5_hex($stripped_manifest_text),
854       'manifest_text' => $orig_manifest_text,
855     });
856     $Job->update_attributes('output' => $output->{uuid});
857     if ($Job->{'output_is_persistent'}) {
858       $arv->{'links'}->{'create'}->execute('link' => {
859         'tail_kind' => 'arvados#user',
860         'tail_uuid' => $User->{'uuid'},
861         'head_kind' => 'arvados#collection',
862         'head_uuid' => $Job->{'output'},
863         'link_class' => 'resources',
864         'name' => 'wants',
865       });
866     }
867   };
868   if ($@) {
869     Log (undef, "Failed to register output manifest: $@");
870   }
871 }
872
873 Log (undef, "finish");
874
875 save_meta();
876 exit 0;
877
878
879
880 sub update_progress_stats
881 {
882   $progress_stats_updated = time;
883   return if !$progress_is_dirty;
884   my ($todo, $done, $running) = (scalar @jobstep_todo,
885                                  scalar @jobstep_done,
886                                  scalar @slot - scalar @freeslot - scalar @holdslot);
887   $Job->{'tasks_summary'} ||= {};
888   $Job->{'tasks_summary'}->{'todo'} = $todo;
889   $Job->{'tasks_summary'}->{'done'} = $done;
890   $Job->{'tasks_summary'}->{'running'} = $running;
891   if ($job_has_uuid) {
892     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
893   }
894   Log (undef, "status: $done done, $running running, $todo todo");
895   $progress_is_dirty = 0;
896 }
897
898
899
900 sub reapchildren
901 {
902   my $pid = waitpid (-1, WNOHANG);
903   return 0 if $pid <= 0;
904
905   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
906                   . "."
907                   . $slot[$proc{$pid}->{slot}]->{cpu});
908   my $jobstepid = $proc{$pid}->{jobstep};
909   my $elapsed = time - $proc{$pid}->{time};
910   my $Jobstep = $jobstep[$jobstepid];
911
912   my $childstatus = $?;
913   my $exitvalue = $childstatus >> 8;
914   my $exitinfo = sprintf("exit %d signal %d%s",
915                          $exitvalue,
916                          $childstatus & 127,
917                          ($childstatus & 128 ? ' core dump' : ''));
918   $Jobstep->{'arvados_task'}->reload;
919   my $task_success = $Jobstep->{'arvados_task'}->{success};
920
921   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
922
923   if (!defined $task_success) {
924     # task did not indicate one way or the other --> fail
925     $Jobstep->{'arvados_task'}->{success} = 0;
926     $Jobstep->{'arvados_task'}->save;
927     $task_success = 0;
928   }
929
930   if (!$task_success)
931   {
932     my $temporary_fail;
933     $temporary_fail ||= $Jobstep->{node_fail};
934     $temporary_fail ||= ($exitvalue == 111);
935
936     ++$thisround_failed;
937     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
938
939     # Check for signs of a failed or misconfigured node
940     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
941         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
942       # Don't count this against jobstep failure thresholds if this
943       # node is already suspected faulty and srun exited quickly
944       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
945           $elapsed < 5) {
946         Log ($jobstepid, "blaming failure on suspect node " .
947              $slot[$proc{$pid}->{slot}]->{node}->{name});
948         $temporary_fail ||= 1;
949       }
950       ban_node_by_slot($proc{$pid}->{slot});
951     }
952
953     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
954                              ++$Jobstep->{'failures'},
955                              $temporary_fail ? 'temporary ' : 'permanent',
956                              $elapsed));
957
958     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
959       # Give up on this task, and the whole job
960       $main::success = 0;
961       $main::please_freeze = 1;
962     }
963     else {
964       # Put this task back on the todo queue
965       push @jobstep_todo, $jobstepid;
966     }
967     $Job->{'tasks_summary'}->{'failed'}++;
968   }
969   else
970   {
971     ++$thisround_succeeded;
972     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
973     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
974     push @jobstep_done, $jobstepid;
975     Log ($jobstepid, "success in $elapsed seconds");
976   }
977   $Jobstep->{exitcode} = $childstatus;
978   $Jobstep->{finishtime} = time;
979   process_stderr ($jobstepid, $task_success);
980   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
981
982   close $reader{$jobstepid};
983   delete $reader{$jobstepid};
984   delete $slot[$proc{$pid}->{slot}]->{pid};
985   push @freeslot, $proc{$pid}->{slot};
986   delete $proc{$pid};
987
988   # Load new tasks
989   my $newtask_list = [];
990   my $newtask_results;
991   do {
992     $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
993       'where' => {
994         'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
995       },
996       'order' => 'qsequence',
997       'offset' => scalar(@$newtask_list),
998     );
999     push(@$newtask_list, @{$newtask_results->{items}});
1000   } while (@{$newtask_results->{items}});
1001   foreach my $arvados_task (@$newtask_list) {
1002     my $jobstep = {
1003       'level' => $arvados_task->{'sequence'},
1004       'failures' => 0,
1005       'arvados_task' => $arvados_task
1006     };
1007     push @jobstep, $jobstep;
1008     push @jobstep_todo, $#jobstep;
1009   }
1010
1011   $progress_is_dirty = 1;
1012   1;
1013 }
1014
1015 sub check_refresh_wanted
1016 {
1017   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1018   if (@stat && $stat[9] > $latest_refresh) {
1019     $latest_refresh = scalar time;
1020     if ($job_has_uuid) {
1021       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1022       for my $attr ('cancelled_at',
1023                     'cancelled_by_user_uuid',
1024                     'cancelled_by_client_uuid') {
1025         $Job->{$attr} = $Job2->{$attr};
1026       }
1027       if ($Job->{'cancelled_at'}) {
1028         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1029              " by user " . $Job->{cancelled_by_user_uuid});
1030         $main::success = 0;
1031         $main::please_freeze = 1;
1032       }
1033     }
1034   }
1035 }
1036
1037 sub check_squeue
1038 {
1039   # return if the kill list was checked <4 seconds ago
1040   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1041   {
1042     return;
1043   }
1044   $squeue_kill_checked = time;
1045
1046   # use killem() on procs whose killtime is reached
1047   for (keys %proc)
1048   {
1049     if (exists $proc{$_}->{killtime}
1050         && $proc{$_}->{killtime} <= time)
1051     {
1052       killem ($_);
1053     }
1054   }
1055
1056   # return if the squeue was checked <60 seconds ago
1057   if (defined $squeue_checked && $squeue_checked > time - 60)
1058   {
1059     return;
1060   }
1061   $squeue_checked = time;
1062
1063   if (!$have_slurm)
1064   {
1065     # here is an opportunity to check for mysterious problems with local procs
1066     return;
1067   }
1068
1069   # get a list of steps still running
1070   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1071   chop @squeue;
1072   if ($squeue[-1] ne "ok")
1073   {
1074     return;
1075   }
1076   pop @squeue;
1077
1078   # which of my jobsteps are running, according to squeue?
1079   my %ok;
1080   foreach (@squeue)
1081   {
1082     if (/^(\d+)\.(\d+) (\S+)/)
1083     {
1084       if ($1 eq $ENV{SLURM_JOBID})
1085       {
1086         $ok{$3} = 1;
1087       }
1088     }
1089   }
1090
1091   # which of my active child procs (>60s old) were not mentioned by squeue?
1092   foreach (keys %proc)
1093   {
1094     if ($proc{$_}->{time} < time - 60
1095         && !exists $ok{$proc{$_}->{jobstepname}}
1096         && !exists $proc{$_}->{killtime})
1097     {
1098       # kill this proc if it hasn't exited in 30 seconds
1099       $proc{$_}->{killtime} = time + 30;
1100     }
1101   }
1102 }
1103
1104
1105 sub release_allocation
1106 {
1107   if ($have_slurm)
1108   {
1109     Log (undef, "release job allocation");
1110     system "scancel $ENV{SLURM_JOBID}";
1111   }
1112 }
1113
1114
1115 sub readfrompipes
1116 {
1117   my $gotsome = 0;
1118   foreach my $job (keys %reader)
1119   {
1120     my $buf;
1121     while (0 < sysread ($reader{$job}, $buf, 8192))
1122     {
1123       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1124       $jobstep[$job]->{stderr} .= $buf;
1125       preprocess_stderr ($job);
1126       if (length ($jobstep[$job]->{stderr}) > 16384)
1127       {
1128         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1129       }
1130       $gotsome = 1;
1131     }
1132   }
1133   return $gotsome;
1134 }
1135
1136
1137 sub preprocess_stderr
1138 {
1139   my $job = shift;
1140
1141   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1142     my $line = $1;
1143     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1144     Log ($job, "stderr $line");
1145     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1146       # whoa.
1147       $main::please_freeze = 1;
1148     }
1149     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1150       $jobstep[$job]->{node_fail} = 1;
1151       ban_node_by_slot($jobstep[$job]->{slotindex});
1152     }
1153   }
1154 }
1155
1156
1157 sub process_stderr
1158 {
1159   my $job = shift;
1160   my $task_success = shift;
1161   preprocess_stderr ($job);
1162
1163   map {
1164     Log ($job, "stderr $_");
1165   } split ("\n", $jobstep[$job]->{stderr});
1166 }
1167
1168 sub fetch_block
1169 {
1170   my $hash = shift;
1171   my ($keep, $child_out, $output_block);
1172
1173   my $cmd = "arv-get \Q$hash\E";
1174   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1175   sysread($keep, $output_block, 64 * 1024 * 1024);
1176   close $keep;
1177   return $output_block;
1178 }
1179
1180 sub collate_output
1181 {
1182   Log (undef, "collate");
1183
1184   my ($child_out, $child_in);
1185   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1186   my $joboutput;
1187   for (@jobstep)
1188   {
1189     next if (!exists $_->{'arvados_task'}->{output} ||
1190              !$_->{'arvados_task'}->{'success'} ||
1191              $_->{'exitcode'} != 0);
1192     my $output = $_->{'arvados_task'}->{output};
1193     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1194     {
1195       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1196       print $child_in $output;
1197     }
1198     elsif (@jobstep == 1)
1199     {
1200       $joboutput = $output;
1201       last;
1202     }
1203     elsif (defined (my $outblock = fetch_block ($output)))
1204     {
1205       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1206       print $child_in $outblock;
1207     }
1208     else
1209     {
1210       Log (undef, "XXX fetch_block($output) failed XXX");
1211       $main::success = 0;
1212     }
1213   }
1214   $child_in->close;
1215
1216   if (!defined $joboutput) {
1217     my $s = IO::Select->new($child_out);
1218     if ($s->can_read(120)) {
1219       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1220       chomp($joboutput);
1221     } else {
1222       Log (undef, "timed out reading from 'arv-put'");
1223     }
1224   }
1225   waitpid($pid, 0);
1226
1227   if ($joboutput)
1228   {
1229     Log (undef, "output $joboutput");
1230     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1231   }
1232   else
1233   {
1234     Log (undef, "output undef");
1235   }
1236   return $joboutput;
1237 }
1238
1239
1240 sub killem
1241 {
1242   foreach (@_)
1243   {
1244     my $sig = 2;                # SIGINT first
1245     if (exists $proc{$_}->{"sent_$sig"} &&
1246         time - $proc{$_}->{"sent_$sig"} > 4)
1247     {
1248       $sig = 15;                # SIGTERM if SIGINT doesn't work
1249     }
1250     if (exists $proc{$_}->{"sent_$sig"} &&
1251         time - $proc{$_}->{"sent_$sig"} > 4)
1252     {
1253       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1254     }
1255     if (!exists $proc{$_}->{"sent_$sig"})
1256     {
1257       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1258       kill $sig, $_;
1259       select (undef, undef, undef, 0.1);
1260       if ($sig == 2)
1261       {
1262         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1263       }
1264       $proc{$_}->{"sent_$sig"} = time;
1265       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1266     }
1267   }
1268 }
1269
1270
1271 sub fhbits
1272 {
1273   my($bits);
1274   for (@_) {
1275     vec($bits,fileno($_),1) = 1;
1276   }
1277   $bits;
1278 }
1279
1280
1281 sub Log                         # ($jobstep_id, $logmessage)
1282 {
1283   if ($_[1] =~ /\n/) {
1284     for my $line (split (/\n/, $_[1])) {
1285       Log ($_[0], $line);
1286     }
1287     return;
1288   }
1289   my $fh = select STDERR; $|=1; select $fh;
1290   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1291   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1292   $message .= "\n";
1293   my $datetime;
1294   if ($local_logfile || -t STDERR) {
1295     my @gmtime = gmtime;
1296     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1297                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1298   }
1299   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1300
1301   if ($local_logfile) {
1302     print $local_logfile $datetime . " " . $message;
1303   }
1304 }
1305
1306
1307 sub croak
1308 {
1309   my ($package, $file, $line) = caller;
1310   my $message = "@_ at $file line $line\n";
1311   Log (undef, $message);
1312   freeze() if @jobstep_todo;
1313   collate_output() if @jobstep_todo;
1314   cleanup();
1315   save_meta() if $local_logfile;
1316   die;
1317 }
1318
1319
1320 sub cleanup
1321 {
1322   return if !$job_has_uuid;
1323   $Job->update_attributes('running' => 0,
1324                           'success' => 0,
1325                           'finished_at' => scalar gmtime);
1326 }
1327
1328
1329 sub save_meta
1330 {
1331   my $justcheckpoint = shift; # false if this will be the last meta saved
1332   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1333
1334   $local_logfile->flush;
1335   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1336       . quotemeta($local_logfile->filename);
1337   my $loglocator = `$cmd`;
1338   die "system $cmd failed: $?" if $?;
1339   chomp($loglocator);
1340
1341   $local_logfile = undef;   # the temp file is automatically deleted
1342   Log (undef, "log manifest is $loglocator");
1343   $Job->{'log'} = $loglocator;
1344   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1345 }
1346
1347
1348 sub freeze_if_want_freeze
1349 {
1350   if ($main::please_freeze)
1351   {
1352     release_allocation();
1353     if (@_)
1354     {
1355       # kill some srun procs before freeze+stop
1356       map { $proc{$_} = {} } @_;
1357       while (%proc)
1358       {
1359         killem (keys %proc);
1360         select (undef, undef, undef, 0.1);
1361         my $died;
1362         while (($died = waitpid (-1, WNOHANG)) > 0)
1363         {
1364           delete $proc{$died};
1365         }
1366       }
1367     }
1368     freeze();
1369     collate_output();
1370     cleanup();
1371     save_meta();
1372     exit 0;
1373   }
1374 }
1375
1376
1377 sub freeze
1378 {
1379   Log (undef, "Freeze not implemented");
1380   return;
1381 }
1382
1383
1384 sub thaw
1385 {
1386   croak ("Thaw not implemented");
1387 }
1388
1389
1390 sub freezequote
1391 {
1392   my $s = shift;
1393   $s =~ s/\\/\\\\/g;
1394   $s =~ s/\n/\\n/g;
1395   return $s;
1396 }
1397
1398
1399 sub freezeunquote
1400 {
1401   my $s = shift;
1402   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1403   return $s;
1404 }
1405
1406
1407 sub srun
1408 {
1409   my $srunargs = shift;
1410   my $execargs = shift;
1411   my $opts = shift || {};
1412   my $stdin = shift;
1413   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1414   print STDERR (join (" ",
1415                       map { / / ? "'$_'" : $_ }
1416                       (@$args)),
1417                 "\n")
1418       if $ENV{CRUNCH_DEBUG};
1419
1420   if (defined $stdin) {
1421     my $child = open STDIN, "-|";
1422     defined $child or die "no fork: $!";
1423     if ($child == 0) {
1424       print $stdin or die $!;
1425       close STDOUT or die $!;
1426       exit 0;
1427     }
1428   }
1429
1430   return system (@$args) if $opts->{fork};
1431
1432   exec @$args;
1433   warn "ENV size is ".length(join(" ",%ENV));
1434   die "exec failed: $!: @$args";
1435 }
1436
1437
1438 sub ban_node_by_slot {
1439   # Don't start any new jobsteps on this node for 60 seconds
1440   my $slotid = shift;
1441   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1442   $slot[$slotid]->{node}->{hold_count}++;
1443   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1444 }
1445
1446 sub must_lock_now
1447 {
1448   my ($lockfile, $error_message) = @_;
1449   open L, ">", $lockfile or croak("$lockfile: $!");
1450   if (!flock L, LOCK_EX|LOCK_NB) {
1451     croak("Can't lock $lockfile: $error_message\n");
1452   }
1453 }
1454
1455 sub find_docker_hash {
1456   # Given a Keep locator, search for a matching link to find the Docker hash
1457   # of the stored image.
1458   my $locator = shift;
1459   my $links_result = $arv->{links}->{list}->execute(
1460     filters => [["head_uuid", "=", $locator],
1461                 ["link_class", "=", "docker_image_hash"]],
1462     limit => 1);
1463   my $docker_hash;
1464   foreach my $link (@{$links_result->{items}}) {
1465     $docker_hash = lc($link->{name});
1466   }
1467   return $docker_hash;
1468 }
1469
1470 __DATA__
1471 #!/usr/bin/perl
1472
1473 # checkout-and-build
1474
1475 use Fcntl ':flock';
1476
1477 my $destdir = $ENV{"CRUNCH_SRC"};
1478 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1479 my $repo = $ENV{"CRUNCH_SRC_URL"};
1480
1481 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1482 flock L, LOCK_EX;
1483 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1484     exit 0;
1485 }
1486
1487 unlink "$destdir.commit";
1488 open STDOUT, ">", "$destdir.log";
1489 open STDERR, ">&STDOUT";
1490
1491 mkdir $destdir;
1492 my @git_archive_data = <DATA>;
1493 if (@git_archive_data) {
1494   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1495   print TARX @git_archive_data;
1496   if(!close(TARX)) {
1497     die "'tar -C $destdir -xf -' exited $?: $!";
1498   }
1499 }
1500
1501 my $pwd;
1502 chomp ($pwd = `pwd`);
1503 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1504 mkdir $install_dir;
1505
1506 for my $src_path ("$destdir/arvados/sdk/python") {
1507   if (-d $src_path) {
1508     shell_or_die ("virtualenv", $install_dir);
1509     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1510   }
1511 }
1512
1513 if (-e "$destdir/crunch_scripts/install") {
1514     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1515 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1516     # Old version
1517     shell_or_die ("./tests/autotests.sh", $install_dir);
1518 } elsif (-e "./install.sh") {
1519     shell_or_die ("./install.sh", $install_dir);
1520 }
1521
1522 if ($commit) {
1523     unlink "$destdir.commit.new";
1524     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1525     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1526 }
1527
1528 close L;
1529
1530 exit 0;
1531
1532 sub shell_or_die
1533 {
1534   if ($ENV{"DEBUG"}) {
1535     print STDERR "@_\n";
1536   }
1537   system (@_) == 0
1538       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1539 }
1540
1541 __DATA__