3775: Merge branch 'master' into 3775-fetch-git-repo
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path remove_tree );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $local_job = 0;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job;
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($jobspec =~ /^[-a-z\d]+$/)
152 {
153   # $jobspec is an Arvados UUID, not a JSON job specification
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # If some other crunch-job process has grabbed this job (or we see
157     # other evidence that the job is already underway) we exit
158     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159     # mark the job as failed.
160     if ($Job->{'is_locked_by_uuid'}) {
161       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
162       exit EX_TEMPFAIL;
163     }
164     if ($Job->{'state'} ne 'Queued') {
165       Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
166       exit EX_TEMPFAIL;
167     }
168     if ($Job->{'success'} ne undef) {
169       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
170       exit EX_TEMPFAIL;
171     }
172     if ($Job->{'running'}) {
173       Log(undef, "Job 'running' flag is already set");
174       exit EX_TEMPFAIL;
175     }
176     if ($Job->{'started_at'}) {
177       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
178       exit EX_TEMPFAIL;
179     }
180   }
181 }
182 else
183 {
184   $Job = JSON::decode_json($jobspec);
185
186   if (!$resume_stash)
187   {
188     map { croak ("No $_ specified") unless $Job->{$_} }
189     qw(script script_version script_parameters);
190   }
191
192   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193   $Job->{'started_at'} = gmtime;
194
195   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
196 }
197 $job_id = $Job->{'uuid'};
198
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
201
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
205
206
207 Log (undef, "check slurm allocation");
208 my @slot;
209 my @node;
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
211 my @sinfo;
212 if (!$have_slurm)
213 {
214   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215   push @sinfo, "$localcpus localhost";
216 }
217 if (exists $ENV{SLURM_NODELIST})
218 {
219   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
220 }
221 foreach (@sinfo)
222 {
223   my ($ncpus, $slurm_nodelist) = split;
224   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
225
226   my @nodelist;
227   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
228   {
229     my $nodelist = $1;
230     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
231     {
232       my $ranges = $1;
233       foreach (split (",", $ranges))
234       {
235         my ($a, $b);
236         if (/(\d+)-(\d+)/)
237         {
238           $a = $1;
239           $b = $2;
240         }
241         else
242         {
243           $a = $_;
244           $b = $_;
245         }
246         push @nodelist, map {
247           my $n = $nodelist;
248           $n =~ s/\[[-,\d]+\]/$_/;
249           $n;
250         } ($a..$b);
251       }
252     }
253     else
254     {
255       push @nodelist, $nodelist;
256     }
257   }
258   foreach my $nodename (@nodelist)
259   {
260     Log (undef, "node $nodename - $ncpus slots");
261     my $node = { name => $nodename,
262                  ncpus => $ncpus,
263                  losing_streak => 0,
264                  hold_until => 0 };
265     foreach my $cpu (1..$ncpus)
266     {
267       push @slot, { node => $node,
268                     cpu => $cpu };
269     }
270   }
271   push @node, @nodelist;
272 }
273
274
275
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
278
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
280
281
282
283 my $jobmanager_id;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286         $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287   Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
288   exit EX_TEMPFAIL;
289 }
290 $Job->update_attributes('state' => 'Running',
291                         'tasks_summary' => { 'failed' => 0,
292                                              'todo' => 1,
293                                              'running' => 0,
294                                              'done' => 0 });
295
296
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
305
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
311
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
315
316
317 my @jobstep;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
322 my $squeue_checked;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
326
327
328
329 if (defined $Job->{thawedfromkey})
330 {
331   thaw ($Job->{thawedfromkey});
332 }
333 else
334 {
335   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336     'job_uuid' => $Job->{'uuid'},
337     'sequence' => 0,
338     'qsequence' => 0,
339     'parameters' => {},
340                                                           });
341   push @jobstep, { 'level' => 0,
342                    'failures' => 0,
343                    'arvados_task' => $first_task,
344                  };
345   push @jobstep_todo, 0;
346 }
347
348
349 if (!$have_slurm)
350 {
351   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
352 }
353
354
355 my $build_script;
356 do {
357   local $/ = undef;
358   $build_script = <DATA>;
359 };
360 my $nodelist = join(",", @node);
361
362 if (!defined $no_clear_tmp) {
363   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
364   Log (undef, "Clean work dirs");
365
366   my $cleanpid = fork();
367   if ($cleanpid == 0)
368   {
369     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
370           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
371     exit (1);
372   }
373   while (1)
374   {
375     last if $cleanpid == waitpid (-1, WNOHANG);
376     freeze_if_want_freeze ($cleanpid);
377     select (undef, undef, undef, 0.1);
378   }
379   Log (undef, "Cleanup command exited ".exit_status_s($?));
380 }
381
382
383 my $git_archive;
384 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
385   # If we're in user-land (i.e., not called from crunch-dispatch)
386   # script_version can be an absolute directory path, signifying we
387   # should work straight out of that directory instead of using a git
388   # commit.
389   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
390   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
391 }
392 else {
393   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
394
395   # Install requested code version
396   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
397
398   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
399
400   # If we're running under crunch-dispatch, it will have already
401   # pulled the appropriate source tree into its own repository, and
402   # given us that repo's path as $git_dir.
403   #
404   # If we're running a "local" job, we might have to fetch content
405   # from a remote repository.
406   #
407   # (Currently crunch-dispatch gives a local path with --git-dir, but
408   # we might as well accept URLs there too in case it changes its
409   # mind.)
410   my $repo = $git_dir || $Job->{'repository'};
411
412   # Repository can be remote or local. If remote, we'll need to fetch it
413   # to a local dir before doing `git log` et al.
414   my $repo_location;
415
416   if ($repo =~ m{://|\@.*:}) {
417     # $repo is a git url we can clone, like git:// or https:// or
418     # file:/// or git@host:repo.git
419     $repo_location = 'remote';
420   } elsif ($repo =~ m{^\.*/}) {
421     # $repo is a local path to a git index. We'll also resolve ../foo
422     # to ../foo/.git if the latter is a directory.
423     if (-d "$repo/.git") {
424       $repo = "$repo/.git";
425     }
426     $repo_location = 'local';
427   } else {
428     # $repo is none of the above. It must be the name of a hosted
429     # repository.
430     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
431       'filters' => [['name','=',$repo]]
432         )->{'items'};
433     my $n_found = scalar @{$arv_repo_list};
434     if ($n_found > 0) {
435       Log(undef, "Repository '$repo' -> "
436           . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
437     }
438     if ($n_found != 1) {
439       croak("Error: Found $n_found repositories with name '$repo'.");
440     }
441     $repo = $arv_repo_list->[0]->{'fetch_url'};
442     $repo_location = 'remote';
443   }
444   Log(undef, "Using $repo_location repository '$repo'");
445   $ENV{"CRUNCH_SRC_URL"} = $repo;
446
447   # Resolve given script_version (we'll call that $treeish here) to a
448   # commit sha1 ($commit).
449   my $treeish = $Job->{'script_version'};
450   my $commit;
451   if ($repo_location eq 'remote') {
452     # We minimize excess object-fetching by re-using the same bare
453     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
454     # just keep adding remotes to it as needed.
455     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
456     my $gitcmd = "git --git-dir=\Q$local_repo\E";
457
458     # Set up our local repo for caching remote objects, making
459     # archives, etc.
460     if (!-d $local_repo) {
461       make_path($local_repo) or croak("Error: could not create $local_repo");
462     }
463     # This works (exits 0 and doesn't delete fetched objects) even
464     # if $local_repo is already initialized:
465     `$gitcmd init --bare`;
466     if ($?) {
467       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
468     }
469
470     # If $treeish looks like a hash (or abbrev hash) we look it up in
471     # our local cache first, since that's cheaper. (We don't want to
472     # do that with tags/branches though -- those change over time, so
473     # they should always be resolved by the remote repo.)
474     if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
475       # Hide stderr because it's normal for this to fail:
476       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
477       if ($? == 0 &&
478           $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
479           $sha1 =~ /^([0-9a-f]{40})$/s) {
480         $commit = $1;
481         Log(undef, "Commit $commit already present in $local_repo");
482       }
483     }
484
485     if (!defined $commit) {
486       # If $treeish isn't just a hash or abbrev hash, or isn't here
487       # yet, we need to fetch the remote to resolve it correctly.
488
489       # First, remove all local heads. This prevents a name that does
490       # not exist on the remote from resolving to (or colliding with)
491       # a previously fetched branch or tag (possibly from a different
492       # remote).
493       remove_tree("$local_repo/refs/heads", {keep_root => 1});
494
495       Log(undef, "Fetching objects from $repo to $local_repo");
496       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
497       if ($?) {
498         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
499       }
500     }
501
502     # Now that the data is all here, we will use our local repo for
503     # the rest of our git activities.
504     $repo = $local_repo;
505   }
506
507   my $gitcmd = "git --git-dir=\Q$repo\E";
508   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
509   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
510     croak("`$gitcmd rev-list` exited "
511           .exit_status_s($?)
512           .", '$treeish' not found. Giving up.");
513   }
514   $commit = $1;
515   Log(undef, "Version $treeish is commit $commit");
516
517   if ($commit ne $Job->{'script_version'}) {
518     # Record the real commit id in the database, frozentokey, logs,
519     # etc. -- instead of an abbreviation or a branch name which can
520     # become ambiguous or point to a different commit in the future.
521     if (!$Job->update_attributes('script_version' => $commit)) {
522       croak("Error: failed to update job's script_version attribute");
523     }
524   }
525
526   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
527   $git_archive = `$gitcmd archive ''\Q$commit\E`;
528   if ($?) {
529     croak("Error: $gitcmd archive exited ".exit_status_s($?));
530   }
531 }
532
533 if (!defined $git_archive) {
534   Log(undef, "Skip install phase (no git archive)");
535   if ($have_slurm) {
536     Log(undef, "Warning: This probably means workers have no source tree!");
537   }
538 }
539 else {
540   Log(undef, "Run install script on all workers");
541
542   my @srunargs = ("srun",
543                   "--nodelist=$nodelist",
544                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
545   my @execargs = ("sh", "-c",
546                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
547
548   # Note: this section is almost certainly unnecessary if we're
549   # running tasks in docker containers.
550   my $installpid = fork();
551   if ($installpid == 0)
552   {
553     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
554     exit (1);
555   }
556   while (1)
557   {
558     last if $installpid == waitpid (-1, WNOHANG);
559     freeze_if_want_freeze ($installpid);
560     select (undef, undef, undef, 0.1);
561   }
562   Log (undef, "Install script exited ".exit_status_s($?));
563 }
564
565 if (!$have_slurm)
566 {
567   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
568   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
569 }
570
571 # If this job requires a Docker image, install that.
572 my $docker_bin = "/usr/bin/docker.io";
573 my ($docker_locator, $docker_stream, $docker_hash);
574 if ($docker_locator = $Job->{docker_image_locator}) {
575   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
576   if (!$docker_hash)
577   {
578     croak("No Docker image hash found from locator $docker_locator");
579   }
580   $docker_stream =~ s/^\.//;
581   my $docker_install_script = qq{
582 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
583     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
584 fi
585 };
586   my $docker_pid = fork();
587   if ($docker_pid == 0)
588   {
589     srun (["srun", "--nodelist=" . join(',', @node)],
590           ["/bin/sh", "-ec", $docker_install_script]);
591     exit ($?);
592   }
593   while (1)
594   {
595     last if $docker_pid == waitpid (-1, WNOHANG);
596     freeze_if_want_freeze ($docker_pid);
597     select (undef, undef, undef, 0.1);
598   }
599   if ($? != 0)
600   {
601     croak("Installing Docker image from $docker_locator exited "
602           .exit_status_s($?));
603   }
604 }
605
606 foreach (qw (script script_version script_parameters runtime_constraints))
607 {
608   Log (undef,
609        "$_ " .
610        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
611 }
612 foreach (split (/\n/, $Job->{knobs}))
613 {
614   Log (undef, "knob " . $_);
615 }
616
617
618
619 $main::success = undef;
620
621
622
623 ONELEVEL:
624
625 my $thisround_succeeded = 0;
626 my $thisround_failed = 0;
627 my $thisround_failed_multiple = 0;
628
629 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
630                        or $a <=> $b } @jobstep_todo;
631 my $level = $jobstep[$jobstep_todo[0]]->{level};
632 Log (undef, "start level $level");
633
634
635
636 my %proc;
637 my @freeslot = (0..$#slot);
638 my @holdslot;
639 my %reader;
640 my $progress_is_dirty = 1;
641 my $progress_stats_updated = 0;
642
643 update_progress_stats();
644
645
646
647 THISROUND:
648 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
649 {
650   my $id = $jobstep_todo[$todo_ptr];
651   my $Jobstep = $jobstep[$id];
652   if ($Jobstep->{level} != $level)
653   {
654     next;
655   }
656
657   pipe $reader{$id}, "writer" or croak ($!);
658   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
659   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
660
661   my $childslot = $freeslot[0];
662   my $childnode = $slot[$childslot]->{node};
663   my $childslotname = join (".",
664                             $slot[$childslot]->{node}->{name},
665                             $slot[$childslot]->{cpu});
666   my $childpid = fork();
667   if ($childpid == 0)
668   {
669     $SIG{'INT'} = 'DEFAULT';
670     $SIG{'QUIT'} = 'DEFAULT';
671     $SIG{'TERM'} = 'DEFAULT';
672
673     foreach (values (%reader))
674     {
675       close($_);
676     }
677     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
678     open(STDOUT,">&writer");
679     open(STDERR,">&writer");
680
681     undef $dbh;
682     undef $sth;
683
684     delete $ENV{"GNUPGHOME"};
685     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
686     $ENV{"TASK_QSEQUENCE"} = $id;
687     $ENV{"TASK_SEQUENCE"} = $level;
688     $ENV{"JOB_SCRIPT"} = $Job->{script};
689     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
690       $param =~ tr/a-z/A-Z/;
691       $ENV{"JOB_PARAMETER_$param"} = $value;
692     }
693     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
694     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
695     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
696     $ENV{"HOME"} = $ENV{"TASK_WORK"};
697     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
698     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
699     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
700     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
701
702     $ENV{"GZIP"} = "-n";
703
704     my @srunargs = (
705       "srun",
706       "--nodelist=".$childnode->{name},
707       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
708       "--job-name=$job_id.$id.$$",
709         );
710     my $build_script_to_send = "";
711     my $command =
712         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
713         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
714         ."&& cd $ENV{CRUNCH_TMP} ";
715     if ($build_script)
716     {
717       $build_script_to_send = $build_script;
718       $command .=
719           "&& perl -";
720     }
721     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
722     if ($docker_hash)
723     {
724       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
725       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
726       # Dynamically configure the container to use the host system as its
727       # DNS server.  Get the host's global addresses from the ip command,
728       # and turn them into docker --dns options using gawk.
729       $command .=
730           q{$(ip -o address show scope global |
731               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
732       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
733       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
734       $command .= "--env=\QHOME=/home/crunch\E ";
735       while (my ($env_key, $env_val) = each %ENV)
736       {
737         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
738           if ($env_key eq "TASK_WORK") {
739             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
740           }
741           elsif ($env_key eq "TASK_KEEPMOUNT") {
742             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
743           }
744           else {
745             $command .= "--env=\Q$env_key=$env_val\E ";
746           }
747         }
748       }
749       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
750       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
751       $command .= "\Q$docker_hash\E ";
752       $command .= "stdbuf --output=0 --error=0 ";
753       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
754     } else {
755       # Non-docker run
756       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
757       $command .= "stdbuf --output=0 --error=0 ";
758       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
759     }
760
761     my @execargs = ('bash', '-c', $command);
762     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
763     # exec() failed, we assume nothing happened.
764     Log(undef, "srun() failed on build script");
765     die;
766   }
767   close("writer");
768   if (!defined $childpid)
769   {
770     close $reader{$id};
771     delete $reader{$id};
772     next;
773   }
774   shift @freeslot;
775   $proc{$childpid} = { jobstep => $id,
776                        time => time,
777                        slot => $childslot,
778                        jobstepname => "$job_id.$id.$childpid",
779                      };
780   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
781   $slot[$childslot]->{pid} = $childpid;
782
783   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
784   Log ($id, "child $childpid started on $childslotname");
785   $Jobstep->{starttime} = time;
786   $Jobstep->{node} = $childnode->{name};
787   $Jobstep->{slotindex} = $childslot;
788   delete $Jobstep->{stderr};
789   delete $Jobstep->{finishtime};
790
791   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
792   $Jobstep->{'arvados_task'}->save;
793
794   splice @jobstep_todo, $todo_ptr, 1;
795   --$todo_ptr;
796
797   $progress_is_dirty = 1;
798
799   while (!@freeslot
800          ||
801          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
802   {
803     last THISROUND if $main::please_freeze;
804     if ($main::please_info)
805     {
806       $main::please_info = 0;
807       freeze();
808       collate_output();
809       save_meta(1);
810       update_progress_stats();
811     }
812     my $gotsome
813         = readfrompipes ()
814         + reapchildren ();
815     if (!$gotsome)
816     {
817       check_refresh_wanted();
818       check_squeue();
819       update_progress_stats();
820       select (undef, undef, undef, 0.1);
821     }
822     elsif (time - $progress_stats_updated >= 30)
823     {
824       update_progress_stats();
825     }
826     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
827         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
828     {
829       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
830           .($thisround_failed+$thisround_succeeded)
831           .") -- giving up on this round";
832       Log (undef, $message);
833       last THISROUND;
834     }
835
836     # move slots from freeslot to holdslot (or back to freeslot) if necessary
837     for (my $i=$#freeslot; $i>=0; $i--) {
838       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
839         push @holdslot, (splice @freeslot, $i, 1);
840       }
841     }
842     for (my $i=$#holdslot; $i>=0; $i--) {
843       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
844         push @freeslot, (splice @holdslot, $i, 1);
845       }
846     }
847
848     # give up if no nodes are succeeding
849     if (!grep { $_->{node}->{losing_streak} == 0 &&
850                     $_->{node}->{hold_count} < 4 } @slot) {
851       my $message = "Every node has failed -- giving up on this round";
852       Log (undef, $message);
853       last THISROUND;
854     }
855   }
856 }
857
858
859 push @freeslot, splice @holdslot;
860 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
861
862
863 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
864 while (%proc)
865 {
866   if ($main::please_continue) {
867     $main::please_continue = 0;
868     goto THISROUND;
869   }
870   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
871   readfrompipes ();
872   if (!reapchildren())
873   {
874     check_refresh_wanted();
875     check_squeue();
876     update_progress_stats();
877     select (undef, undef, undef, 0.1);
878     killem (keys %proc) if $main::please_freeze;
879   }
880 }
881
882 update_progress_stats();
883 freeze_if_want_freeze();
884
885
886 if (!defined $main::success)
887 {
888   if (@jobstep_todo &&
889       $thisround_succeeded == 0 &&
890       ($thisround_failed == 0 || $thisround_failed > 4))
891   {
892     my $message = "stop because $thisround_failed tasks failed and none succeeded";
893     Log (undef, $message);
894     $main::success = 0;
895   }
896   if (!@jobstep_todo)
897   {
898     $main::success = 1;
899   }
900 }
901
902 goto ONELEVEL if !defined $main::success;
903
904
905 release_allocation();
906 freeze();
907 my $collated_output = &collate_output();
908
909 if (!$collated_output) {
910   Log(undef, "output undef");
911 }
912 else {
913   eval {
914     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
915         or die "failed to get collated manifest: $!";
916     my $orig_manifest_text = '';
917     while (my $manifest_line = <$orig_manifest>) {
918       $orig_manifest_text .= $manifest_line;
919     }
920     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
921       'manifest_text' => $orig_manifest_text,
922     });
923     Log(undef, "output uuid " . $output->{uuid});
924     Log(undef, "output hash " . $output->{portable_data_hash});
925     $Job->update_attributes('output' => $output->{portable_data_hash});
926   };
927   if ($@) {
928     Log (undef, "Failed to register output manifest: $@");
929   }
930 }
931
932 Log (undef, "finish");
933
934 save_meta();
935
936 my $final_state;
937 if ($collated_output && $main::success) {
938   $final_state = 'Complete';
939 } else {
940   $final_state = 'Failed';
941 }
942 $Job->update_attributes('state' => $final_state);
943
944 exit (($final_state eq 'Complete') ? 0 : 1);
945
946
947
948 sub update_progress_stats
949 {
950   $progress_stats_updated = time;
951   return if !$progress_is_dirty;
952   my ($todo, $done, $running) = (scalar @jobstep_todo,
953                                  scalar @jobstep_done,
954                                  scalar @slot - scalar @freeslot - scalar @holdslot);
955   $Job->{'tasks_summary'} ||= {};
956   $Job->{'tasks_summary'}->{'todo'} = $todo;
957   $Job->{'tasks_summary'}->{'done'} = $done;
958   $Job->{'tasks_summary'}->{'running'} = $running;
959   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
960   Log (undef, "status: $done done, $running running, $todo todo");
961   $progress_is_dirty = 0;
962 }
963
964
965
966 sub reapchildren
967 {
968   my $pid = waitpid (-1, WNOHANG);
969   return 0 if $pid <= 0;
970
971   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
972                   . "."
973                   . $slot[$proc{$pid}->{slot}]->{cpu});
974   my $jobstepid = $proc{$pid}->{jobstep};
975   my $elapsed = time - $proc{$pid}->{time};
976   my $Jobstep = $jobstep[$jobstepid];
977
978   my $childstatus = $?;
979   my $exitvalue = $childstatus >> 8;
980   my $exitinfo = "exit ".exit_status_s($childstatus);
981   $Jobstep->{'arvados_task'}->reload;
982   my $task_success = $Jobstep->{'arvados_task'}->{success};
983
984   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
985
986   if (!defined $task_success) {
987     # task did not indicate one way or the other --> fail
988     $Jobstep->{'arvados_task'}->{success} = 0;
989     $Jobstep->{'arvados_task'}->save;
990     $task_success = 0;
991   }
992
993   if (!$task_success)
994   {
995     my $temporary_fail;
996     $temporary_fail ||= $Jobstep->{node_fail};
997     $temporary_fail ||= ($exitvalue == 111);
998
999     ++$thisround_failed;
1000     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1001
1002     # Check for signs of a failed or misconfigured node
1003     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1004         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1005       # Don't count this against jobstep failure thresholds if this
1006       # node is already suspected faulty and srun exited quickly
1007       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1008           $elapsed < 5) {
1009         Log ($jobstepid, "blaming failure on suspect node " .
1010              $slot[$proc{$pid}->{slot}]->{node}->{name});
1011         $temporary_fail ||= 1;
1012       }
1013       ban_node_by_slot($proc{$pid}->{slot});
1014     }
1015
1016     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1017                              ++$Jobstep->{'failures'},
1018                              $temporary_fail ? 'temporary ' : 'permanent',
1019                              $elapsed));
1020
1021     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1022       # Give up on this task, and the whole job
1023       $main::success = 0;
1024       $main::please_freeze = 1;
1025     }
1026     # Put this task back on the todo queue
1027     push @jobstep_todo, $jobstepid;
1028     $Job->{'tasks_summary'}->{'failed'}++;
1029   }
1030   else
1031   {
1032     ++$thisround_succeeded;
1033     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1034     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1035     push @jobstep_done, $jobstepid;
1036     Log ($jobstepid, "success in $elapsed seconds");
1037   }
1038   $Jobstep->{exitcode} = $childstatus;
1039   $Jobstep->{finishtime} = time;
1040   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1041   $Jobstep->{'arvados_task'}->save;
1042   process_stderr ($jobstepid, $task_success);
1043   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1044
1045   close $reader{$jobstepid};
1046   delete $reader{$jobstepid};
1047   delete $slot[$proc{$pid}->{slot}]->{pid};
1048   push @freeslot, $proc{$pid}->{slot};
1049   delete $proc{$pid};
1050
1051   if ($task_success) {
1052     # Load new tasks
1053     my $newtask_list = [];
1054     my $newtask_results;
1055     do {
1056       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1057         'where' => {
1058           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1059         },
1060         'order' => 'qsequence',
1061         'offset' => scalar(@$newtask_list),
1062       );
1063       push(@$newtask_list, @{$newtask_results->{items}});
1064     } while (@{$newtask_results->{items}});
1065     foreach my $arvados_task (@$newtask_list) {
1066       my $jobstep = {
1067         'level' => $arvados_task->{'sequence'},
1068         'failures' => 0,
1069         'arvados_task' => $arvados_task
1070       };
1071       push @jobstep, $jobstep;
1072       push @jobstep_todo, $#jobstep;
1073     }
1074   }
1075
1076   $progress_is_dirty = 1;
1077   1;
1078 }
1079
1080 sub check_refresh_wanted
1081 {
1082   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1083   if (@stat && $stat[9] > $latest_refresh) {
1084     $latest_refresh = scalar time;
1085     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1086     for my $attr ('cancelled_at',
1087                   'cancelled_by_user_uuid',
1088                   'cancelled_by_client_uuid',
1089                   'state') {
1090       $Job->{$attr} = $Job2->{$attr};
1091     }
1092     if ($Job->{'state'} ne "Running") {
1093       if ($Job->{'state'} eq "Cancelled") {
1094         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1095       } else {
1096         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1097       }
1098       $main::success = 0;
1099       $main::please_freeze = 1;
1100     }
1101   }
1102 }
1103
1104 sub check_squeue
1105 {
1106   # return if the kill list was checked <4 seconds ago
1107   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1108   {
1109     return;
1110   }
1111   $squeue_kill_checked = time;
1112
1113   # use killem() on procs whose killtime is reached
1114   for (keys %proc)
1115   {
1116     if (exists $proc{$_}->{killtime}
1117         && $proc{$_}->{killtime} <= time)
1118     {
1119       killem ($_);
1120     }
1121   }
1122
1123   # return if the squeue was checked <60 seconds ago
1124   if (defined $squeue_checked && $squeue_checked > time - 60)
1125   {
1126     return;
1127   }
1128   $squeue_checked = time;
1129
1130   if (!$have_slurm)
1131   {
1132     # here is an opportunity to check for mysterious problems with local procs
1133     return;
1134   }
1135
1136   # get a list of steps still running
1137   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1138   chop @squeue;
1139   if ($squeue[-1] ne "ok")
1140   {
1141     return;
1142   }
1143   pop @squeue;
1144
1145   # which of my jobsteps are running, according to squeue?
1146   my %ok;
1147   foreach (@squeue)
1148   {
1149     if (/^(\d+)\.(\d+) (\S+)/)
1150     {
1151       if ($1 eq $ENV{SLURM_JOBID})
1152       {
1153         $ok{$3} = 1;
1154       }
1155     }
1156   }
1157
1158   # which of my active child procs (>60s old) were not mentioned by squeue?
1159   foreach (keys %proc)
1160   {
1161     if ($proc{$_}->{time} < time - 60
1162         && !exists $ok{$proc{$_}->{jobstepname}}
1163         && !exists $proc{$_}->{killtime})
1164     {
1165       # kill this proc if it hasn't exited in 30 seconds
1166       $proc{$_}->{killtime} = time + 30;
1167     }
1168   }
1169 }
1170
1171
1172 sub release_allocation
1173 {
1174   if ($have_slurm)
1175   {
1176     Log (undef, "release job allocation");
1177     system "scancel $ENV{SLURM_JOBID}";
1178   }
1179 }
1180
1181
1182 sub readfrompipes
1183 {
1184   my $gotsome = 0;
1185   foreach my $job (keys %reader)
1186   {
1187     my $buf;
1188     while (0 < sysread ($reader{$job}, $buf, 8192))
1189     {
1190       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1191       $jobstep[$job]->{stderr} .= $buf;
1192       preprocess_stderr ($job);
1193       if (length ($jobstep[$job]->{stderr}) > 16384)
1194       {
1195         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1196       }
1197       $gotsome = 1;
1198     }
1199   }
1200   return $gotsome;
1201 }
1202
1203
1204 sub preprocess_stderr
1205 {
1206   my $job = shift;
1207
1208   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1209     my $line = $1;
1210     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1211     Log ($job, "stderr $line");
1212     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1213       # whoa.
1214       $main::please_freeze = 1;
1215     }
1216     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1217       $jobstep[$job]->{node_fail} = 1;
1218       ban_node_by_slot($jobstep[$job]->{slotindex});
1219     }
1220   }
1221 }
1222
1223
1224 sub process_stderr
1225 {
1226   my $job = shift;
1227   my $task_success = shift;
1228   preprocess_stderr ($job);
1229
1230   map {
1231     Log ($job, "stderr $_");
1232   } split ("\n", $jobstep[$job]->{stderr});
1233 }
1234
1235 sub fetch_block
1236 {
1237   my $hash = shift;
1238   my ($keep, $child_out, $output_block);
1239
1240   my $cmd = "arv-get \Q$hash\E";
1241   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1242   $output_block = '';
1243   while (1) {
1244     my $buf;
1245     my $bytes = sysread($keep, $buf, 1024 * 1024);
1246     if (!defined $bytes) {
1247       die "reading from arv-get: $!";
1248     } elsif ($bytes == 0) {
1249       # sysread returns 0 at the end of the pipe.
1250       last;
1251     } else {
1252       # some bytes were read into buf.
1253       $output_block .= $buf;
1254     }
1255   }
1256   close $keep;
1257   return $output_block;
1258 }
1259
1260 sub collate_output
1261 {
1262   Log (undef, "collate");
1263
1264   my ($child_out, $child_in);
1265   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1266                   '--retries', put_retry_count());
1267   my $joboutput;
1268   for (@jobstep)
1269   {
1270     next if (!exists $_->{'arvados_task'}->{'output'} ||
1271              !$_->{'arvados_task'}->{'success'});
1272     my $output = $_->{'arvados_task'}->{output};
1273     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1274     {
1275       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1276       print $child_in $output;
1277     }
1278     elsif (@jobstep == 1)
1279     {
1280       $joboutput = $output;
1281       last;
1282     }
1283     elsif (defined (my $outblock = fetch_block ($output)))
1284     {
1285       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1286       print $child_in $outblock;
1287     }
1288     else
1289     {
1290       Log (undef, "XXX fetch_block($output) failed XXX");
1291       $main::success = 0;
1292     }
1293   }
1294   $child_in->close;
1295
1296   if (!defined $joboutput) {
1297     my $s = IO::Select->new($child_out);
1298     if ($s->can_read(120)) {
1299       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1300       chomp($joboutput);
1301       # TODO: Ensure exit status == 0.
1302     } else {
1303       Log (undef, "timed out reading from 'arv-put'");
1304     }
1305   }
1306   # TODO: kill $pid instead of waiting, now that we've decided to
1307   # ignore further output.
1308   waitpid($pid, 0);
1309
1310   return $joboutput;
1311 }
1312
1313
1314 sub killem
1315 {
1316   foreach (@_)
1317   {
1318     my $sig = 2;                # SIGINT first
1319     if (exists $proc{$_}->{"sent_$sig"} &&
1320         time - $proc{$_}->{"sent_$sig"} > 4)
1321     {
1322       $sig = 15;                # SIGTERM if SIGINT doesn't work
1323     }
1324     if (exists $proc{$_}->{"sent_$sig"} &&
1325         time - $proc{$_}->{"sent_$sig"} > 4)
1326     {
1327       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1328     }
1329     if (!exists $proc{$_}->{"sent_$sig"})
1330     {
1331       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1332       kill $sig, $_;
1333       select (undef, undef, undef, 0.1);
1334       if ($sig == 2)
1335       {
1336         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1337       }
1338       $proc{$_}->{"sent_$sig"} = time;
1339       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1340     }
1341   }
1342 }
1343
1344
1345 sub fhbits
1346 {
1347   my($bits);
1348   for (@_) {
1349     vec($bits,fileno($_),1) = 1;
1350   }
1351   $bits;
1352 }
1353
1354
1355 sub Log                         # ($jobstep_id, $logmessage)
1356 {
1357   if ($_[1] =~ /\n/) {
1358     for my $line (split (/\n/, $_[1])) {
1359       Log ($_[0], $line);
1360     }
1361     return;
1362   }
1363   my $fh = select STDERR; $|=1; select $fh;
1364   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1365   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1366   $message .= "\n";
1367   my $datetime;
1368   if ($local_logfile || -t STDERR) {
1369     my @gmtime = gmtime;
1370     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1371                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1372   }
1373   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1374
1375   if ($local_logfile) {
1376     print $local_logfile $datetime . " " . $message;
1377   }
1378 }
1379
1380
1381 sub croak
1382 {
1383   my ($package, $file, $line) = caller;
1384   my $message = "@_ at $file line $line\n";
1385   Log (undef, $message);
1386   freeze() if @jobstep_todo;
1387   collate_output() if @jobstep_todo;
1388   cleanup() if $Job;
1389   save_meta() if $local_logfile;
1390   die;
1391 }
1392
1393
1394 sub cleanup
1395 {
1396   if ($Job->{'state'} eq 'Cancelled') {
1397     $Job->update_attributes('finished_at' => scalar gmtime);
1398   } else {
1399     $Job->update_attributes('state' => 'Failed');
1400   }
1401 }
1402
1403
1404 sub save_meta
1405 {
1406   my $justcheckpoint = shift; # false if this will be the last meta saved
1407   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1408
1409   $local_logfile->flush;
1410   my $retry_count = put_retry_count();
1411   my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1412       "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1413   my $loglocator = `$cmd`;
1414   die "system $cmd exited ".exit_status_s($?) if $?;
1415   chomp($loglocator);
1416
1417   $local_logfile = undef;   # the temp file is automatically deleted
1418   Log (undef, "log manifest is $loglocator");
1419   $Job->{'log'} = $loglocator;
1420   $Job->update_attributes('log', $loglocator);
1421 }
1422
1423
1424 sub freeze_if_want_freeze
1425 {
1426   if ($main::please_freeze)
1427   {
1428     release_allocation();
1429     if (@_)
1430     {
1431       # kill some srun procs before freeze+stop
1432       map { $proc{$_} = {} } @_;
1433       while (%proc)
1434       {
1435         killem (keys %proc);
1436         select (undef, undef, undef, 0.1);
1437         my $died;
1438         while (($died = waitpid (-1, WNOHANG)) > 0)
1439         {
1440           delete $proc{$died};
1441         }
1442       }
1443     }
1444     freeze();
1445     collate_output();
1446     cleanup();
1447     save_meta();
1448     exit 1;
1449   }
1450 }
1451
1452
1453 sub freeze
1454 {
1455   Log (undef, "Freeze not implemented");
1456   return;
1457 }
1458
1459
1460 sub thaw
1461 {
1462   croak ("Thaw not implemented");
1463 }
1464
1465
1466 sub freezequote
1467 {
1468   my $s = shift;
1469   $s =~ s/\\/\\\\/g;
1470   $s =~ s/\n/\\n/g;
1471   return $s;
1472 }
1473
1474
1475 sub freezeunquote
1476 {
1477   my $s = shift;
1478   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1479   return $s;
1480 }
1481
1482
1483 sub srun
1484 {
1485   my $srunargs = shift;
1486   my $execargs = shift;
1487   my $opts = shift || {};
1488   my $stdin = shift;
1489   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1490   print STDERR (join (" ",
1491                       map { / / ? "'$_'" : $_ }
1492                       (@$args)),
1493                 "\n")
1494       if $ENV{CRUNCH_DEBUG};
1495
1496   if (defined $stdin) {
1497     my $child = open STDIN, "-|";
1498     defined $child or die "no fork: $!";
1499     if ($child == 0) {
1500       print $stdin or die $!;
1501       close STDOUT or die $!;
1502       exit 0;
1503     }
1504   }
1505
1506   return system (@$args) if $opts->{fork};
1507
1508   exec @$args;
1509   warn "ENV size is ".length(join(" ",%ENV));
1510   die "exec failed: $!: @$args";
1511 }
1512
1513
1514 sub ban_node_by_slot {
1515   # Don't start any new jobsteps on this node for 60 seconds
1516   my $slotid = shift;
1517   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1518   $slot[$slotid]->{node}->{hold_count}++;
1519   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1520 }
1521
1522 sub must_lock_now
1523 {
1524   my ($lockfile, $error_message) = @_;
1525   open L, ">", $lockfile or croak("$lockfile: $!");
1526   if (!flock L, LOCK_EX|LOCK_NB) {
1527     croak("Can't lock $lockfile: $error_message\n");
1528   }
1529 }
1530
1531 sub find_docker_image {
1532   # Given a Keep locator, check to see if it contains a Docker image.
1533   # If so, return its stream name and Docker hash.
1534   # If not, return undef for both values.
1535   my $locator = shift;
1536   my ($streamname, $filename);
1537   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1538     foreach my $line (split(/\n/, $image->{manifest_text})) {
1539       my @tokens = split(/\s+/, $line);
1540       next if (!@tokens);
1541       $streamname = shift(@tokens);
1542       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1543         if (defined($filename)) {
1544           return (undef, undef);  # More than one file in the Collection.
1545         } else {
1546           $filename = (split(/:/, $filedata, 3))[2];
1547         }
1548       }
1549     }
1550   }
1551   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1552     return ($streamname, $1);
1553   } else {
1554     return (undef, undef);
1555   }
1556 }
1557
1558 sub put_retry_count {
1559   # Calculate a --retries argument for arv-put that will have it try
1560   # approximately as long as this Job has been running.
1561   my $stoptime = shift || time;
1562   my $starttime = $jobstep[0]->{starttime};
1563   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1564   my $retries = 0;
1565   while ($timediff >= 2) {
1566     $retries++;
1567     $timediff /= 2;
1568   }
1569   return ($retries > 3) ? $retries : 3;
1570 }
1571
1572 sub exit_status_s {
1573   # Given a $?, return a human-readable exit code string like "0" or
1574   # "1" or "0 with signal 1" or "1 with signal 11".
1575   my $exitcode = shift;
1576   my $s = $exitcode >> 8;
1577   if ($exitcode & 0x7f) {
1578     $s .= " with signal " . ($exitcode & 0x7f);
1579   }
1580   if ($exitcode & 0x80) {
1581     $s .= " with core dump";
1582   }
1583   return $s;
1584 }
1585
1586 __DATA__
1587 #!/usr/bin/perl
1588
1589 # checkout-and-build
1590
1591 use Fcntl ':flock';
1592 use File::Path qw( make_path );
1593
1594 my $destdir = $ENV{"CRUNCH_SRC"};
1595 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1596 my $repo = $ENV{"CRUNCH_SRC_URL"};
1597 my $task_work = $ENV{"TASK_WORK"};
1598
1599 for my $dir ($destdir, $task_work) {
1600     if ($dir) {
1601         make_path $dir;
1602         -e $dir or die "Failed to create temporary directory ($dir): $!";
1603     }
1604 }
1605
1606 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1607 flock L, LOCK_EX;
1608 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1609     if (@ARGV) {
1610         exec(@ARGV);
1611         die "Cannot exec `@ARGV`: $!";
1612     } else {
1613         exit 0;
1614     }
1615 }
1616
1617 unlink "$destdir.commit";
1618 open STDOUT, ">", "$destdir.log";
1619 open STDERR, ">&STDOUT";
1620
1621 mkdir $destdir;
1622 my @git_archive_data = <DATA>;
1623 if (@git_archive_data) {
1624   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1625   print TARX @git_archive_data;
1626   if(!close(TARX)) {
1627     die "'tar -C $destdir -xf -' exited $?: $!";
1628   }
1629 }
1630
1631 my $pwd;
1632 chomp ($pwd = `pwd`);
1633 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1634 mkdir $install_dir;
1635
1636 for my $src_path ("$destdir/arvados/sdk/python") {
1637   if (-d $src_path) {
1638     shell_or_die ("virtualenv", $install_dir);
1639     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1640   }
1641 }
1642
1643 if (-e "$destdir/crunch_scripts/install") {
1644     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1645 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1646     # Old version
1647     shell_or_die ("./tests/autotests.sh", $install_dir);
1648 } elsif (-e "./install.sh") {
1649     shell_or_die ("./install.sh", $install_dir);
1650 }
1651
1652 if ($commit) {
1653     unlink "$destdir.commit.new";
1654     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1655     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1656 }
1657
1658 close L;
1659
1660 if (@ARGV) {
1661     exec(@ARGV);
1662     die "Cannot exec `@ARGV`: $!";
1663 } else {
1664     exit 0;
1665 }
1666
1667 sub shell_or_die
1668 {
1669   if ($ENV{"DEBUG"}) {
1670     print STDERR "@_\n";
1671   }
1672   system (@_) == 0
1673       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1674 }
1675
1676 __DATA__