3342: Handle case in PipelineInstance#friendly_link_name where
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Digest::MD5 qw(md5_hex);
80 use Getopt::Long;
81 use IPC::Open2;
82 use IO::Select;
83 use File::Temp;
84 use Fcntl ':flock';
85 use File::Path qw( make_path );
86
87 use constant EX_TEMPFAIL => 75;
88
89 $ENV{"TMPDIR"} ||= "/tmp";
90 unless (defined $ENV{"CRUNCH_TMP"}) {
91   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
92   if ($ENV{"USER"} ne "crunch" && $< != 0) {
93     # use a tmp dir unique for my uid
94     $ENV{"CRUNCH_TMP"} .= "-$<";
95   }
96 }
97
98 # Create the tmp directory if it does not exist
99 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
100   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
101 }
102
103 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
104 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
105 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
106 mkdir ($ENV{"JOB_WORK"});
107
108 my $force_unlock;
109 my $git_dir;
110 my $jobspec;
111 my $job_api_token;
112 my $no_clear_tmp;
113 my $resume_stash;
114 GetOptions('force-unlock' => \$force_unlock,
115            'git-dir=s' => \$git_dir,
116            'job=s' => \$jobspec,
117            'job-api-token=s' => \$job_api_token,
118            'no-clear-tmp' => \$no_clear_tmp,
119            'resume-stash=s' => \$resume_stash,
120     );
121
122 if (defined $job_api_token) {
123   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
124 }
125
126 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
127 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
128 my $local_job = !$job_has_uuid;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job = {};
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($job_has_uuid)
152 {
153   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154   if (!$force_unlock) {
155     # If some other crunch-job process has grabbed this job (or we see
156     # other evidence that the job is already underway) we exit
157     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
158     # mark the job as failed.
159     if ($Job->{'is_locked_by_uuid'}) {
160       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
161       exit EX_TEMPFAIL;
162     }
163     if ($Job->{'success'} ne undef) {
164       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
165       exit EX_TEMPFAIL;
166     }
167     if ($Job->{'running'}) {
168       Log(undef, "Job 'running' flag is already set");
169       exit EX_TEMPFAIL;
170     }
171     if ($Job->{'started_at'}) {
172       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
173       exit EX_TEMPFAIL;
174     }
175   }
176 }
177 else
178 {
179   $Job = JSON::decode_json($jobspec);
180
181   if (!$resume_stash)
182   {
183     map { croak ("No $_ specified") unless $Job->{$_} }
184     qw(script script_version script_parameters);
185   }
186
187   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
188   $Job->{'started_at'} = gmtime;
189
190   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
191
192   $job_has_uuid = 1;
193 }
194 $job_id = $Job->{'uuid'};
195
196 my $keep_logfile = $job_id . '.log.txt';
197 $local_logfile = File::Temp->new();
198
199 $Job->{'runtime_constraints'} ||= {};
200 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
201 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
202
203
204 Log (undef, "check slurm allocation");
205 my @slot;
206 my @node;
207 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
208 my @sinfo;
209 if (!$have_slurm)
210 {
211   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
212   push @sinfo, "$localcpus localhost";
213 }
214 if (exists $ENV{SLURM_NODELIST})
215 {
216   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
217 }
218 foreach (@sinfo)
219 {
220   my ($ncpus, $slurm_nodelist) = split;
221   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
222
223   my @nodelist;
224   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
225   {
226     my $nodelist = $1;
227     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
228     {
229       my $ranges = $1;
230       foreach (split (",", $ranges))
231       {
232         my ($a, $b);
233         if (/(\d+)-(\d+)/)
234         {
235           $a = $1;
236           $b = $2;
237         }
238         else
239         {
240           $a = $_;
241           $b = $_;
242         }
243         push @nodelist, map {
244           my $n = $nodelist;
245           $n =~ s/\[[-,\d]+\]/$_/;
246           $n;
247         } ($a..$b);
248       }
249     }
250     else
251     {
252       push @nodelist, $nodelist;
253     }
254   }
255   foreach my $nodename (@nodelist)
256   {
257     Log (undef, "node $nodename - $ncpus slots");
258     my $node = { name => $nodename,
259                  ncpus => $ncpus,
260                  losing_streak => 0,
261                  hold_until => 0 };
262     foreach my $cpu (1..$ncpus)
263     {
264       push @slot, { node => $node,
265                     cpu => $cpu };
266     }
267   }
268   push @node, @nodelist;
269 }
270
271
272
273 # Ensure that we get one jobstep running on each allocated node before
274 # we start overloading nodes with concurrent steps
275
276 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
277
278
279
280 my $jobmanager_id;
281 if ($job_has_uuid)
282 {
283   # Claim this job, and make sure nobody else does
284   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
285           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
286     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
287     exit EX_TEMPFAIL;
288   }
289   $Job->update_attributes('started_at' => scalar gmtime,
290                           'running' => 1,
291                           'success' => undef,
292                           'tasks_summary' => { 'failed' => 0,
293                                                'todo' => 1,
294                                                'running' => 0,
295                                                'done' => 0 });
296 }
297
298
299 Log (undef, "start");
300 $SIG{'INT'} = sub { $main::please_freeze = 1; };
301 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
302 $SIG{'TERM'} = \&croak;
303 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
304 $SIG{'ALRM'} = sub { $main::please_info = 1; };
305 $SIG{'CONT'} = sub { $main::please_continue = 1; };
306 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
307
308 $main::please_freeze = 0;
309 $main::please_info = 0;
310 $main::please_continue = 0;
311 $main::please_refresh = 0;
312 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
313
314 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
315 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
316 $ENV{"JOB_UUID"} = $job_id;
317
318
319 my @jobstep;
320 my @jobstep_todo = ();
321 my @jobstep_done = ();
322 my @jobstep_tomerge = ();
323 my $jobstep_tomerge_level = 0;
324 my $squeue_checked;
325 my $squeue_kill_checked;
326 my $output_in_keep = 0;
327 my $latest_refresh = scalar time;
328
329
330
331 if (defined $Job->{thawedfromkey})
332 {
333   thaw ($Job->{thawedfromkey});
334 }
335 else
336 {
337   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
338     'job_uuid' => $Job->{'uuid'},
339     'sequence' => 0,
340     'qsequence' => 0,
341     'parameters' => {},
342                                                           });
343   push @jobstep, { 'level' => 0,
344                    'failures' => 0,
345                    'arvados_task' => $first_task,
346                  };
347   push @jobstep_todo, 0;
348 }
349
350
351 if (!$have_slurm)
352 {
353   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
354 }
355
356
357 my $build_script;
358
359
360 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
361
362 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
363 if ($skip_install)
364 {
365   if (!defined $no_clear_tmp) {
366     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
367     system($clear_tmp_cmd) == 0
368         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
369   }
370   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
371   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
372     if (-d $src_path) {
373       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
374           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
375       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
376           == 0
377           or croak ("setup.py in $src_path failed: exit ".($?>>8));
378     }
379   }
380 }
381 else
382 {
383   do {
384     local $/ = undef;
385     $build_script = <DATA>;
386   };
387   Log (undef, "Install revision ".$Job->{script_version});
388   my $nodelist = join(",", @node);
389
390   if (!defined $no_clear_tmp) {
391     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
392
393     my $cleanpid = fork();
394     if ($cleanpid == 0)
395     {
396       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
397             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
398       exit (1);
399     }
400     while (1)
401     {
402       last if $cleanpid == waitpid (-1, WNOHANG);
403       freeze_if_want_freeze ($cleanpid);
404       select (undef, undef, undef, 0.1);
405     }
406     Log (undef, "Clean-work-dir exited $?");
407   }
408
409   # Install requested code version
410
411   my @execargs;
412   my @srunargs = ("srun",
413                   "--nodelist=$nodelist",
414                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
415
416   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
417   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
418
419   my $commit;
420   my $git_archive;
421   my $treeish = $Job->{'script_version'};
422
423   # If we're running under crunch-dispatch, it will have pulled the
424   # appropriate source tree into its own repository, and given us that
425   # repo's path as $git_dir. If we're running a "local" job, and a
426   # script_version was specified, it's up to the user to provide the
427   # full path to a local repository in Job->{repository}.
428   #
429   # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
430   # git-archive --remote where appropriate.
431   #
432   # TODO: Accept a locally-hosted Arvados repository by name or
433   # UUID. Use arvados.v1.repositories.list or .get to figure out the
434   # appropriate fetch-url.
435   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
436
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   if (-d "$repo/.git") {
440     # We were given a working directory, but we are only interested in
441     # the index.
442     $repo = "$repo/.git";
443   }
444
445   # If this looks like a subversion r#, look for it in git-svn commit messages
446
447   if ($treeish =~ m{^\d{1,4}$}) {
448     my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
449     chomp $gitlog;
450     if ($gitlog =~ /^[a-f0-9]{40}$/) {
451       $commit = $gitlog;
452       Log (undef, "Using commit $commit for script_version $treeish");
453     }
454   }
455
456   # If that didn't work, try asking git to look it up as a tree-ish.
457
458   if (!defined $commit) {
459     my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
460     chomp $found;
461     if ($found =~ /^[0-9a-f]{40}$/s) {
462       $commit = $found;
463       if ($commit ne $treeish) {
464         # Make sure we record the real commit id in the database,
465         # frozentokey, logs, etc. -- instead of an abbreviation or a
466         # branch name which can become ambiguous or point to a
467         # different commit in the future.
468         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
469         Log (undef, "Using commit $commit for tree-ish $treeish");
470         if ($commit ne $treeish) {
471           $Job->{'script_version'} = $commit;
472           !$job_has_uuid or
473               $Job->update_attributes('script_version' => $commit) or
474               croak("Error while updating job");
475         }
476       }
477     }
478   }
479
480   if (defined $commit) {
481     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482     @execargs = ("sh", "-c",
483                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484     $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485   }
486   else {
487     croak ("could not figure out commit id for $treeish");
488   }
489
490   my $installpid = fork();
491   if ($installpid == 0)
492   {
493     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
494     exit (1);
495   }
496   while (1)
497   {
498     last if $installpid == waitpid (-1, WNOHANG);
499     freeze_if_want_freeze ($installpid);
500     select (undef, undef, undef, 0.1);
501   }
502   Log (undef, "Install exited $?");
503 }
504
505 if (!$have_slurm)
506 {
507   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
508   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
509 }
510
511 # If this job requires a Docker image, install that.
512 my $docker_bin = "/usr/bin/docker.io";
513 my ($docker_locator, $docker_stream, $docker_hash);
514 if ($docker_locator = $Job->{docker_image_locator}) {
515   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
516   if (!$docker_hash)
517   {
518     croak("No Docker image hash found from locator $docker_locator");
519   }
520   $docker_stream =~ s/^\.//;
521   my $docker_install_script = qq{
522 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
523     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
524 fi
525 };
526   my $docker_pid = fork();
527   if ($docker_pid == 0)
528   {
529     srun (["srun", "--nodelist=" . join(',', @node)],
530           ["/bin/sh", "-ec", $docker_install_script]);
531     exit ($?);
532   }
533   while (1)
534   {
535     last if $docker_pid == waitpid (-1, WNOHANG);
536     freeze_if_want_freeze ($docker_pid);
537     select (undef, undef, undef, 0.1);
538   }
539   if ($? != 0)
540   {
541     croak("Installing Docker image from $docker_locator returned exit code $?");
542   }
543 }
544
545 foreach (qw (script script_version script_parameters runtime_constraints))
546 {
547   Log (undef,
548        "$_ " .
549        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
550 }
551 foreach (split (/\n/, $Job->{knobs}))
552 {
553   Log (undef, "knob " . $_);
554 }
555
556
557
558 $main::success = undef;
559
560
561
562 ONELEVEL:
563
564 my $thisround_succeeded = 0;
565 my $thisround_failed = 0;
566 my $thisround_failed_multiple = 0;
567
568 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
569                        or $a <=> $b } @jobstep_todo;
570 my $level = $jobstep[$jobstep_todo[0]]->{level};
571 Log (undef, "start level $level");
572
573
574
575 my %proc;
576 my @freeslot = (0..$#slot);
577 my @holdslot;
578 my %reader;
579 my $progress_is_dirty = 1;
580 my $progress_stats_updated = 0;
581
582 update_progress_stats();
583
584
585
586 THISROUND:
587 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
588 {
589   my $id = $jobstep_todo[$todo_ptr];
590   my $Jobstep = $jobstep[$id];
591   if ($Jobstep->{level} != $level)
592   {
593     next;
594   }
595
596   pipe $reader{$id}, "writer" or croak ($!);
597   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
598   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
599
600   my $childslot = $freeslot[0];
601   my $childnode = $slot[$childslot]->{node};
602   my $childslotname = join (".",
603                             $slot[$childslot]->{node}->{name},
604                             $slot[$childslot]->{cpu});
605   my $childpid = fork();
606   if ($childpid == 0)
607   {
608     $SIG{'INT'} = 'DEFAULT';
609     $SIG{'QUIT'} = 'DEFAULT';
610     $SIG{'TERM'} = 'DEFAULT';
611
612     foreach (values (%reader))
613     {
614       close($_);
615     }
616     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
617     open(STDOUT,">&writer");
618     open(STDERR,">&writer");
619
620     undef $dbh;
621     undef $sth;
622
623     delete $ENV{"GNUPGHOME"};
624     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
625     $ENV{"TASK_QSEQUENCE"} = $id;
626     $ENV{"TASK_SEQUENCE"} = $level;
627     $ENV{"JOB_SCRIPT"} = $Job->{script};
628     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
629       $param =~ tr/a-z/A-Z/;
630       $ENV{"JOB_PARAMETER_$param"} = $value;
631     }
632     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
633     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
634     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
635     $ENV{"HOME"} = $ENV{"TASK_WORK"};
636     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
637     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
638     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
639     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
640
641     $ENV{"GZIP"} = "-n";
642
643     my @srunargs = (
644       "srun",
645       "--nodelist=".$childnode->{name},
646       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
647       "--job-name=$job_id.$id.$$",
648         );
649     my $build_script_to_send = "";
650     my $command =
651         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
652         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
653         ."&& cd $ENV{CRUNCH_TMP} ";
654     if ($build_script)
655     {
656       $build_script_to_send = $build_script;
657       $command .=
658           "&& perl -";
659     }
660     $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
661     if ($docker_hash)
662     {
663       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
664       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
665       # Dynamically configure the container to use the host system as its
666       # DNS server.  Get the host's global addresses from the ip command,
667       # and turn them into docker --dns options using gawk.
668       $command .=
669           q{$(ip -o address show scope global |
670               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
671       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
672       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
673       $command .= "--env=\QHOME=/home/crunch\E ";
674       while (my ($env_key, $env_val) = each %ENV)
675       {
676         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
677           if ($env_key eq "TASK_KEEPMOUNT") {
678             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
679           }
680           else {
681             $command .= "--env=\Q$env_key=$env_val\E ";
682           }
683         }
684       }
685       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
686       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
687       $command .= "\Q$docker_hash\E ";
688       $command .= "stdbuf --output=0 --error=0 ";
689       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
690     } else {
691       # Non-docker run
692       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
693       $command .= "stdbuf --output=0 --error=0 ";
694       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
695     }
696
697     my @execargs = ('bash', '-c', $command);
698     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
699     # exec() failed, we assume nothing happened.
700     Log(undef, "srun() failed on build script");
701     die;
702   }
703   close("writer");
704   if (!defined $childpid)
705   {
706     close $reader{$id};
707     delete $reader{$id};
708     next;
709   }
710   shift @freeslot;
711   $proc{$childpid} = { jobstep => $id,
712                        time => time,
713                        slot => $childslot,
714                        jobstepname => "$job_id.$id.$childpid",
715                      };
716   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
717   $slot[$childslot]->{pid} = $childpid;
718
719   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
720   Log ($id, "child $childpid started on $childslotname");
721   $Jobstep->{starttime} = time;
722   $Jobstep->{node} = $childnode->{name};
723   $Jobstep->{slotindex} = $childslot;
724   delete $Jobstep->{stderr};
725   delete $Jobstep->{finishtime};
726
727   splice @jobstep_todo, $todo_ptr, 1;
728   --$todo_ptr;
729
730   $progress_is_dirty = 1;
731
732   while (!@freeslot
733          ||
734          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
735   {
736     last THISROUND if $main::please_freeze;
737     if ($main::please_info)
738     {
739       $main::please_info = 0;
740       freeze();
741       collate_output();
742       save_meta(1);
743       update_progress_stats();
744     }
745     my $gotsome
746         = readfrompipes ()
747         + reapchildren ();
748     if (!$gotsome)
749     {
750       check_refresh_wanted();
751       check_squeue();
752       update_progress_stats();
753       select (undef, undef, undef, 0.1);
754     }
755     elsif (time - $progress_stats_updated >= 30)
756     {
757       update_progress_stats();
758     }
759     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
760         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
761     {
762       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
763           .($thisround_failed+$thisround_succeeded)
764           .") -- giving up on this round";
765       Log (undef, $message);
766       last THISROUND;
767     }
768
769     # move slots from freeslot to holdslot (or back to freeslot) if necessary
770     for (my $i=$#freeslot; $i>=0; $i--) {
771       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
772         push @holdslot, (splice @freeslot, $i, 1);
773       }
774     }
775     for (my $i=$#holdslot; $i>=0; $i--) {
776       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
777         push @freeslot, (splice @holdslot, $i, 1);
778       }
779     }
780
781     # give up if no nodes are succeeding
782     if (!grep { $_->{node}->{losing_streak} == 0 &&
783                     $_->{node}->{hold_count} < 4 } @slot) {
784       my $message = "Every node has failed -- giving up on this round";
785       Log (undef, $message);
786       last THISROUND;
787     }
788   }
789 }
790
791
792 push @freeslot, splice @holdslot;
793 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
794
795
796 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
797 while (%proc)
798 {
799   if ($main::please_continue) {
800     $main::please_continue = 0;
801     goto THISROUND;
802   }
803   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
804   readfrompipes ();
805   if (!reapchildren())
806   {
807     check_refresh_wanted();
808     check_squeue();
809     update_progress_stats();
810     select (undef, undef, undef, 0.1);
811     killem (keys %proc) if $main::please_freeze;
812   }
813 }
814
815 update_progress_stats();
816 freeze_if_want_freeze();
817
818
819 if (!defined $main::success)
820 {
821   if (@jobstep_todo &&
822       $thisround_succeeded == 0 &&
823       ($thisround_failed == 0 || $thisround_failed > 4))
824   {
825     my $message = "stop because $thisround_failed tasks failed and none succeeded";
826     Log (undef, $message);
827     $main::success = 0;
828   }
829   if (!@jobstep_todo)
830   {
831     $main::success = 1;
832   }
833 }
834
835 goto ONELEVEL if !defined $main::success;
836
837
838 release_allocation();
839 freeze();
840 my $collated_output = &collate_output();
841
842 if ($job_has_uuid) {
843   $Job->update_attributes('running' => 0,
844                           'success' => $collated_output && $main::success,
845                           'finished_at' => scalar gmtime)
846 }
847
848 if (!$collated_output) {
849   Log(undef, "output undef");
850 }
851 else {
852   eval {
853     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
854         or die "failed to get collated manifest: $!";
855     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
856       'manifest_text' => $orig_manifest_text,
857     });
858     Log(undef, "output " . $output->{portable_data_hash});
859     $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
860   };
861   if ($@) {
862     Log (undef, "Failed to register output manifest: $@");
863   }
864 }
865
866 Log (undef, "finish");
867
868 save_meta();
869 exit ($Job->{'success'} ? 1 : 0);
870
871
872
873 sub update_progress_stats
874 {
875   $progress_stats_updated = time;
876   return if !$progress_is_dirty;
877   my ($todo, $done, $running) = (scalar @jobstep_todo,
878                                  scalar @jobstep_done,
879                                  scalar @slot - scalar @freeslot - scalar @holdslot);
880   $Job->{'tasks_summary'} ||= {};
881   $Job->{'tasks_summary'}->{'todo'} = $todo;
882   $Job->{'tasks_summary'}->{'done'} = $done;
883   $Job->{'tasks_summary'}->{'running'} = $running;
884   if ($job_has_uuid) {
885     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
886   }
887   Log (undef, "status: $done done, $running running, $todo todo");
888   $progress_is_dirty = 0;
889 }
890
891
892
893 sub reapchildren
894 {
895   my $pid = waitpid (-1, WNOHANG);
896   return 0 if $pid <= 0;
897
898   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
899                   . "."
900                   . $slot[$proc{$pid}->{slot}]->{cpu});
901   my $jobstepid = $proc{$pid}->{jobstep};
902   my $elapsed = time - $proc{$pid}->{time};
903   my $Jobstep = $jobstep[$jobstepid];
904
905   my $childstatus = $?;
906   my $exitvalue = $childstatus >> 8;
907   my $exitinfo = sprintf("exit %d signal %d%s",
908                          $exitvalue,
909                          $childstatus & 127,
910                          ($childstatus & 128 ? ' core dump' : ''));
911   $Jobstep->{'arvados_task'}->reload;
912   my $task_success = $Jobstep->{'arvados_task'}->{success};
913
914   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
915
916   if (!defined $task_success) {
917     # task did not indicate one way or the other --> fail
918     $Jobstep->{'arvados_task'}->{success} = 0;
919     $Jobstep->{'arvados_task'}->save;
920     $task_success = 0;
921   }
922
923   if (!$task_success)
924   {
925     my $temporary_fail;
926     $temporary_fail ||= $Jobstep->{node_fail};
927     $temporary_fail ||= ($exitvalue == 111);
928
929     ++$thisround_failed;
930     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
931
932     # Check for signs of a failed or misconfigured node
933     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
934         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
935       # Don't count this against jobstep failure thresholds if this
936       # node is already suspected faulty and srun exited quickly
937       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
938           $elapsed < 5) {
939         Log ($jobstepid, "blaming failure on suspect node " .
940              $slot[$proc{$pid}->{slot}]->{node}->{name});
941         $temporary_fail ||= 1;
942       }
943       ban_node_by_slot($proc{$pid}->{slot});
944     }
945
946     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
947                              ++$Jobstep->{'failures'},
948                              $temporary_fail ? 'temporary ' : 'permanent',
949                              $elapsed));
950
951     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
952       # Give up on this task, and the whole job
953       $main::success = 0;
954       $main::please_freeze = 1;
955     }
956     else {
957       # Put this task back on the todo queue
958       push @jobstep_todo, $jobstepid;
959     }
960     $Job->{'tasks_summary'}->{'failed'}++;
961   }
962   else
963   {
964     ++$thisround_succeeded;
965     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
966     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
967     push @jobstep_done, $jobstepid;
968     Log ($jobstepid, "success in $elapsed seconds");
969   }
970   $Jobstep->{exitcode} = $childstatus;
971   $Jobstep->{finishtime} = time;
972   process_stderr ($jobstepid, $task_success);
973   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
974
975   close $reader{$jobstepid};
976   delete $reader{$jobstepid};
977   delete $slot[$proc{$pid}->{slot}]->{pid};
978   push @freeslot, $proc{$pid}->{slot};
979   delete $proc{$pid};
980
981   if ($task_success) {
982     # Load new tasks
983     my $newtask_list = [];
984     my $newtask_results;
985     do {
986       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
987         'where' => {
988           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
989         },
990         'order' => 'qsequence',
991         'offset' => scalar(@$newtask_list),
992       );
993       push(@$newtask_list, @{$newtask_results->{items}});
994     } while (@{$newtask_results->{items}});
995     foreach my $arvados_task (@$newtask_list) {
996       my $jobstep = {
997         'level' => $arvados_task->{'sequence'},
998         'failures' => 0,
999         'arvados_task' => $arvados_task
1000       };
1001       push @jobstep, $jobstep;
1002       push @jobstep_todo, $#jobstep;
1003     }
1004   }
1005
1006   $progress_is_dirty = 1;
1007   1;
1008 }
1009
1010 sub check_refresh_wanted
1011 {
1012   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1013   if (@stat && $stat[9] > $latest_refresh) {
1014     $latest_refresh = scalar time;
1015     if ($job_has_uuid) {
1016       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1017       for my $attr ('cancelled_at',
1018                     'cancelled_by_user_uuid',
1019                     'cancelled_by_client_uuid') {
1020         $Job->{$attr} = $Job2->{$attr};
1021       }
1022       if ($Job->{'cancelled_at'}) {
1023         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1024              " by user " . $Job->{cancelled_by_user_uuid});
1025         $main::success = 0;
1026         $main::please_freeze = 1;
1027       }
1028     }
1029   }
1030 }
1031
1032 sub check_squeue
1033 {
1034   # return if the kill list was checked <4 seconds ago
1035   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1036   {
1037     return;
1038   }
1039   $squeue_kill_checked = time;
1040
1041   # use killem() on procs whose killtime is reached
1042   for (keys %proc)
1043   {
1044     if (exists $proc{$_}->{killtime}
1045         && $proc{$_}->{killtime} <= time)
1046     {
1047       killem ($_);
1048     }
1049   }
1050
1051   # return if the squeue was checked <60 seconds ago
1052   if (defined $squeue_checked && $squeue_checked > time - 60)
1053   {
1054     return;
1055   }
1056   $squeue_checked = time;
1057
1058   if (!$have_slurm)
1059   {
1060     # here is an opportunity to check for mysterious problems with local procs
1061     return;
1062   }
1063
1064   # get a list of steps still running
1065   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1066   chop @squeue;
1067   if ($squeue[-1] ne "ok")
1068   {
1069     return;
1070   }
1071   pop @squeue;
1072
1073   # which of my jobsteps are running, according to squeue?
1074   my %ok;
1075   foreach (@squeue)
1076   {
1077     if (/^(\d+)\.(\d+) (\S+)/)
1078     {
1079       if ($1 eq $ENV{SLURM_JOBID})
1080       {
1081         $ok{$3} = 1;
1082       }
1083     }
1084   }
1085
1086   # which of my active child procs (>60s old) were not mentioned by squeue?
1087   foreach (keys %proc)
1088   {
1089     if ($proc{$_}->{time} < time - 60
1090         && !exists $ok{$proc{$_}->{jobstepname}}
1091         && !exists $proc{$_}->{killtime})
1092     {
1093       # kill this proc if it hasn't exited in 30 seconds
1094       $proc{$_}->{killtime} = time + 30;
1095     }
1096   }
1097 }
1098
1099
1100 sub release_allocation
1101 {
1102   if ($have_slurm)
1103   {
1104     Log (undef, "release job allocation");
1105     system "scancel $ENV{SLURM_JOBID}";
1106   }
1107 }
1108
1109
1110 sub readfrompipes
1111 {
1112   my $gotsome = 0;
1113   foreach my $job (keys %reader)
1114   {
1115     my $buf;
1116     while (0 < sysread ($reader{$job}, $buf, 8192))
1117     {
1118       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1119       $jobstep[$job]->{stderr} .= $buf;
1120       preprocess_stderr ($job);
1121       if (length ($jobstep[$job]->{stderr}) > 16384)
1122       {
1123         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1124       }
1125       $gotsome = 1;
1126     }
1127   }
1128   return $gotsome;
1129 }
1130
1131
1132 sub preprocess_stderr
1133 {
1134   my $job = shift;
1135
1136   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1137     my $line = $1;
1138     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1139     Log ($job, "stderr $line");
1140     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1141       # whoa.
1142       $main::please_freeze = 1;
1143     }
1144     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1145       $jobstep[$job]->{node_fail} = 1;
1146       ban_node_by_slot($jobstep[$job]->{slotindex});
1147     }
1148   }
1149 }
1150
1151
1152 sub process_stderr
1153 {
1154   my $job = shift;
1155   my $task_success = shift;
1156   preprocess_stderr ($job);
1157
1158   map {
1159     Log ($job, "stderr $_");
1160   } split ("\n", $jobstep[$job]->{stderr});
1161 }
1162
1163 sub fetch_block
1164 {
1165   my $hash = shift;
1166   my ($keep, $child_out, $output_block);
1167
1168   my $cmd = "arv-get \Q$hash\E";
1169   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1170   $output_block = '';
1171   while (1) {
1172     my $buf;
1173     my $bytes = sysread($keep, $buf, 1024 * 1024);
1174     if (!defined $bytes) {
1175       die "reading from arv-get: $!";
1176     } elsif ($bytes == 0) {
1177       # sysread returns 0 at the end of the pipe.
1178       last;
1179     } else {
1180       # some bytes were read into buf.
1181       $output_block .= $buf;
1182     }
1183   }
1184   close $keep;
1185   return $output_block;
1186 }
1187
1188 sub collate_output
1189 {
1190   Log (undef, "collate");
1191
1192   my ($child_out, $child_in);
1193   my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
1194   my $joboutput;
1195   for (@jobstep)
1196   {
1197     next if (!exists $_->{'arvados_task'}->{'output'} ||
1198              !$_->{'arvados_task'}->{'success'});
1199     my $output = $_->{'arvados_task'}->{output};
1200     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1201     {
1202       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1203       print $child_in $output;
1204     }
1205     elsif (@jobstep == 1)
1206     {
1207       $joboutput = $output;
1208       last;
1209     }
1210     elsif (defined (my $outblock = fetch_block ($output)))
1211     {
1212       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1213       print $child_in $outblock;
1214     }
1215     else
1216     {
1217       Log (undef, "XXX fetch_block($output) failed XXX");
1218       $main::success = 0;
1219     }
1220   }
1221   $child_in->close;
1222
1223   if (!defined $joboutput) {
1224     my $s = IO::Select->new($child_out);
1225     if ($s->can_read(120)) {
1226       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1227       chomp($joboutput);
1228     } else {
1229       Log (undef, "timed out reading from 'arv-put'");
1230     }
1231   }
1232   waitpid($pid, 0);
1233
1234   return $joboutput;
1235 }
1236
1237
1238 sub killem
1239 {
1240   foreach (@_)
1241   {
1242     my $sig = 2;                # SIGINT first
1243     if (exists $proc{$_}->{"sent_$sig"} &&
1244         time - $proc{$_}->{"sent_$sig"} > 4)
1245     {
1246       $sig = 15;                # SIGTERM if SIGINT doesn't work
1247     }
1248     if (exists $proc{$_}->{"sent_$sig"} &&
1249         time - $proc{$_}->{"sent_$sig"} > 4)
1250     {
1251       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1252     }
1253     if (!exists $proc{$_}->{"sent_$sig"})
1254     {
1255       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1256       kill $sig, $_;
1257       select (undef, undef, undef, 0.1);
1258       if ($sig == 2)
1259       {
1260         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1261       }
1262       $proc{$_}->{"sent_$sig"} = time;
1263       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1264     }
1265   }
1266 }
1267
1268
1269 sub fhbits
1270 {
1271   my($bits);
1272   for (@_) {
1273     vec($bits,fileno($_),1) = 1;
1274   }
1275   $bits;
1276 }
1277
1278
1279 sub Log                         # ($jobstep_id, $logmessage)
1280 {
1281   if ($_[1] =~ /\n/) {
1282     for my $line (split (/\n/, $_[1])) {
1283       Log ($_[0], $line);
1284     }
1285     return;
1286   }
1287   my $fh = select STDERR; $|=1; select $fh;
1288   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1289   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1290   $message .= "\n";
1291   my $datetime;
1292   if ($local_logfile || -t STDERR) {
1293     my @gmtime = gmtime;
1294     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1295                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1296   }
1297   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1298
1299   if ($local_logfile) {
1300     print $local_logfile $datetime . " " . $message;
1301   }
1302 }
1303
1304
1305 sub croak
1306 {
1307   my ($package, $file, $line) = caller;
1308   my $message = "@_ at $file line $line\n";
1309   Log (undef, $message);
1310   freeze() if @jobstep_todo;
1311   collate_output() if @jobstep_todo;
1312   cleanup();
1313   save_meta() if $local_logfile;
1314   die;
1315 }
1316
1317
1318 sub cleanup
1319 {
1320   return if !$job_has_uuid;
1321   $Job->update_attributes('running' => 0,
1322                           'success' => 0,
1323                           'finished_at' => scalar gmtime);
1324 }
1325
1326
1327 sub save_meta
1328 {
1329   my $justcheckpoint = shift; # false if this will be the last meta saved
1330   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1331
1332   $local_logfile->flush;
1333   my $cmd = "arv-put --filename ''\Q$keep_logfile\E "
1334       . quotemeta($local_logfile->filename);
1335   my $loglocator = `$cmd`;
1336   die "system $cmd failed: $?" if $?;
1337   chomp($loglocator);
1338
1339   $local_logfile = undef;   # the temp file is automatically deleted
1340   Log (undef, "log manifest is $loglocator");
1341   $Job->{'log'} = $loglocator;
1342   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1343 }
1344
1345
1346 sub freeze_if_want_freeze
1347 {
1348   if ($main::please_freeze)
1349   {
1350     release_allocation();
1351     if (@_)
1352     {
1353       # kill some srun procs before freeze+stop
1354       map { $proc{$_} = {} } @_;
1355       while (%proc)
1356       {
1357         killem (keys %proc);
1358         select (undef, undef, undef, 0.1);
1359         my $died;
1360         while (($died = waitpid (-1, WNOHANG)) > 0)
1361         {
1362           delete $proc{$died};
1363         }
1364       }
1365     }
1366     freeze();
1367     collate_output();
1368     cleanup();
1369     save_meta();
1370     exit 0;
1371   }
1372 }
1373
1374
1375 sub freeze
1376 {
1377   Log (undef, "Freeze not implemented");
1378   return;
1379 }
1380
1381
1382 sub thaw
1383 {
1384   croak ("Thaw not implemented");
1385 }
1386
1387
1388 sub freezequote
1389 {
1390   my $s = shift;
1391   $s =~ s/\\/\\\\/g;
1392   $s =~ s/\n/\\n/g;
1393   return $s;
1394 }
1395
1396
1397 sub freezeunquote
1398 {
1399   my $s = shift;
1400   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1401   return $s;
1402 }
1403
1404
1405 sub srun
1406 {
1407   my $srunargs = shift;
1408   my $execargs = shift;
1409   my $opts = shift || {};
1410   my $stdin = shift;
1411   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1412   print STDERR (join (" ",
1413                       map { / / ? "'$_'" : $_ }
1414                       (@$args)),
1415                 "\n")
1416       if $ENV{CRUNCH_DEBUG};
1417
1418   if (defined $stdin) {
1419     my $child = open STDIN, "-|";
1420     defined $child or die "no fork: $!";
1421     if ($child == 0) {
1422       print $stdin or die $!;
1423       close STDOUT or die $!;
1424       exit 0;
1425     }
1426   }
1427
1428   return system (@$args) if $opts->{fork};
1429
1430   exec @$args;
1431   warn "ENV size is ".length(join(" ",%ENV));
1432   die "exec failed: $!: @$args";
1433 }
1434
1435
1436 sub ban_node_by_slot {
1437   # Don't start any new jobsteps on this node for 60 seconds
1438   my $slotid = shift;
1439   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1440   $slot[$slotid]->{node}->{hold_count}++;
1441   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1442 }
1443
1444 sub must_lock_now
1445 {
1446   my ($lockfile, $error_message) = @_;
1447   open L, ">", $lockfile or croak("$lockfile: $!");
1448   if (!flock L, LOCK_EX|LOCK_NB) {
1449     croak("Can't lock $lockfile: $error_message\n");
1450   }
1451 }
1452
1453 sub find_docker_image {
1454   # Given a Keep locator, check to see if it contains a Docker image.
1455   # If so, return its stream name and Docker hash.
1456   # If not, return undef for both values.
1457   my $locator = shift;
1458   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1459     my @file_list = @{$image->{files}};
1460     if ((scalar(@file_list) == 1) &&
1461         ($file_list[0][1] =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1462       return ($file_list[0][0], $1);
1463     }
1464   }
1465   return (undef, undef);
1466 }
1467
1468 __DATA__
1469 #!/usr/bin/perl
1470
1471 # checkout-and-build
1472
1473 use Fcntl ':flock';
1474 use File::Path qw( make_path );
1475
1476 my $destdir = $ENV{"CRUNCH_SRC"};
1477 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1478 my $repo = $ENV{"CRUNCH_SRC_URL"};
1479 my $task_work = $ENV{"TASK_WORK"};
1480
1481 if ($task_work) {
1482     make_path $task_work;
1483     -e $task_work or die "Failed to create temporary working directory ($task_work): $!";
1484 }
1485
1486 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1487 flock L, LOCK_EX;
1488 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1489     exit 0;
1490 }
1491
1492 unlink "$destdir.commit";
1493 open STDOUT, ">", "$destdir.log";
1494 open STDERR, ">&STDOUT";
1495
1496 mkdir $destdir;
1497 my @git_archive_data = <DATA>;
1498 if (@git_archive_data) {
1499   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1500   print TARX @git_archive_data;
1501   if(!close(TARX)) {
1502     die "'tar -C $destdir -xf -' exited $?: $!";
1503   }
1504 }
1505
1506 my $pwd;
1507 chomp ($pwd = `pwd`);
1508 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1509 mkdir $install_dir;
1510
1511 for my $src_path ("$destdir/arvados/sdk/python") {
1512   if (-d $src_path) {
1513     shell_or_die ("virtualenv", $install_dir);
1514     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1515   }
1516 }
1517
1518 if (-e "$destdir/crunch_scripts/install") {
1519     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1520 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1521     # Old version
1522     shell_or_die ("./tests/autotests.sh", $install_dir);
1523 } elsif (-e "./install.sh") {
1524     shell_or_die ("./install.sh", $install_dir);
1525 }
1526
1527 if ($commit) {
1528     unlink "$destdir.commit.new";
1529     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1530     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1531 }
1532
1533 close L;
1534
1535 exit 0;
1536
1537 sub shell_or_die
1538 {
1539   if ($ENV{"DEBUG"}) {
1540     print STDERR "@_\n";
1541   }
1542   system (@_) == 0
1543       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1544 }
1545
1546 __DATA__