8a80f9c1eee7bba1d3fd36c55218f669b968ee41
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path remove_tree );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $local_job = 0;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143 my $local_logfile;
144
145 my $User = $arv->{'users'}->{'current'}->execute;
146
147 my $Job;
148 my $job_id;
149 my $dbh;
150 my $sth;
151 if ($jobspec =~ /^[-a-z\d]+$/)
152 {
153   # $jobspec is an Arvados UUID, not a JSON job specification
154   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155   if (!$force_unlock) {
156     # If some other crunch-job process has grabbed this job (or we see
157     # other evidence that the job is already underway) we exit
158     # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159     # mark the job as failed.
160     if ($Job->{'is_locked_by_uuid'}) {
161       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
162       exit EX_TEMPFAIL;
163     }
164     if ($Job->{'state'} ne 'Queued') {
165       Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
166       exit EX_TEMPFAIL;
167     }
168     if ($Job->{'success'} ne undef) {
169       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
170       exit EX_TEMPFAIL;
171     }
172     if ($Job->{'running'}) {
173       Log(undef, "Job 'running' flag is already set");
174       exit EX_TEMPFAIL;
175     }
176     if ($Job->{'started_at'}) {
177       Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
178       exit EX_TEMPFAIL;
179     }
180   }
181 }
182 else
183 {
184   $Job = JSON::decode_json($jobspec);
185
186   if (!$resume_stash)
187   {
188     map { croak ("No $_ specified") unless $Job->{$_} }
189     qw(script script_version script_parameters);
190   }
191
192   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193   $Job->{'started_at'} = gmtime;
194
195   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
196 }
197 $job_id = $Job->{'uuid'};
198
199 my $keep_logfile = $job_id . '.log.txt';
200 $local_logfile = File::Temp->new();
201
202 $Job->{'runtime_constraints'} ||= {};
203 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
204 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
205
206
207 Log (undef, "check slurm allocation");
208 my @slot;
209 my @node;
210 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
211 my @sinfo;
212 if (!$have_slurm)
213 {
214   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
215   push @sinfo, "$localcpus localhost";
216 }
217 if (exists $ENV{SLURM_NODELIST})
218 {
219   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
220 }
221 foreach (@sinfo)
222 {
223   my ($ncpus, $slurm_nodelist) = split;
224   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
225
226   my @nodelist;
227   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
228   {
229     my $nodelist = $1;
230     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
231     {
232       my $ranges = $1;
233       foreach (split (",", $ranges))
234       {
235         my ($a, $b);
236         if (/(\d+)-(\d+)/)
237         {
238           $a = $1;
239           $b = $2;
240         }
241         else
242         {
243           $a = $_;
244           $b = $_;
245         }
246         push @nodelist, map {
247           my $n = $nodelist;
248           $n =~ s/\[[-,\d]+\]/$_/;
249           $n;
250         } ($a..$b);
251       }
252     }
253     else
254     {
255       push @nodelist, $nodelist;
256     }
257   }
258   foreach my $nodename (@nodelist)
259   {
260     Log (undef, "node $nodename - $ncpus slots");
261     my $node = { name => $nodename,
262                  ncpus => $ncpus,
263                  losing_streak => 0,
264                  hold_until => 0 };
265     foreach my $cpu (1..$ncpus)
266     {
267       push @slot, { node => $node,
268                     cpu => $cpu };
269     }
270   }
271   push @node, @nodelist;
272 }
273
274
275
276 # Ensure that we get one jobstep running on each allocated node before
277 # we start overloading nodes with concurrent steps
278
279 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
280
281
282
283 my $jobmanager_id;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286         $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287   Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
288   exit EX_TEMPFAIL;
289 }
290 $Job->update_attributes('state' => 'Running',
291                         'tasks_summary' => { 'failed' => 0,
292                                              'todo' => 1,
293                                              'running' => 0,
294                                              'done' => 0 });
295
296
297 Log (undef, "start");
298 $SIG{'INT'} = sub { $main::please_freeze = 1; };
299 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
300 $SIG{'TERM'} = \&croak;
301 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
302 $SIG{'ALRM'} = sub { $main::please_info = 1; };
303 $SIG{'CONT'} = sub { $main::please_continue = 1; };
304 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
305
306 $main::please_freeze = 0;
307 $main::please_info = 0;
308 $main::please_continue = 0;
309 $main::please_refresh = 0;
310 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
311
312 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
313 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
314 $ENV{"JOB_UUID"} = $job_id;
315
316
317 my @jobstep;
318 my @jobstep_todo = ();
319 my @jobstep_done = ();
320 my @jobstep_tomerge = ();
321 my $jobstep_tomerge_level = 0;
322 my $squeue_checked;
323 my $squeue_kill_checked;
324 my $output_in_keep = 0;
325 my $latest_refresh = scalar time;
326
327
328
329 if (defined $Job->{thawedfromkey})
330 {
331   thaw ($Job->{thawedfromkey});
332 }
333 else
334 {
335   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
336     'job_uuid' => $Job->{'uuid'},
337     'sequence' => 0,
338     'qsequence' => 0,
339     'parameters' => {},
340                                                           });
341   push @jobstep, { 'level' => 0,
342                    'failures' => 0,
343                    'arvados_task' => $first_task,
344                  };
345   push @jobstep_todo, 0;
346 }
347
348
349 if (!$have_slurm)
350 {
351   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
352 }
353
354
355 my $build_script;
356 do {
357   local $/ = undef;
358   $build_script = <DATA>;
359 };
360 my $nodelist = join(",", @node);
361
362 if (!defined $no_clear_tmp) {
363   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
364   Log (undef, "Clean work dirs");
365
366   my $cleanpid = fork();
367   if ($cleanpid == 0)
368   {
369     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
370           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
371     exit (1);
372   }
373   while (1)
374   {
375     last if $cleanpid == waitpid (-1, WNOHANG);
376     freeze_if_want_freeze ($cleanpid);
377     select (undef, undef, undef, 0.1);
378   }
379   Log (undef, "Cleanup command exited ".exit_status_s($?));
380 }
381
382
383 my $git_archive;
384 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
385   # If we're in user-land (i.e., not called from crunch-dispatch)
386   # script_version can be an absolute directory path, signifying we
387   # should work straight out of that directory instead of using a git
388   # commit.
389   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
390   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
391 }
392 else {
393   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
394
395   # Install requested code version
396   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
397
398   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
399
400   # If we're running under crunch-dispatch, it will have already
401   # pulled the appropriate source tree into its own repository, and
402   # given us that repo's path as $git_dir.
403   #
404   # If we're running a "local" job, we might have to fetch content
405   # from a remote repository.
406   #
407   # (Currently crunch-dispatch gives a local path with --git-dir, but
408   # we might as well accept URLs there too in case it changes its
409   # mind.)
410   my $repo = $git_dir || $Job->{'repository'};
411
412   # Repository can be remote or local. If remote, we'll need to fetch it
413   # to a local dir before doing `git log` et al.
414   my $repo_location;
415
416   if ($repo =~ m{://|\@.*:}) {
417     # $repo is a git url we can clone, like git:// or https:// or
418     # file:/// or git@host:repo.git
419     $repo_location = 'remote';
420   } elsif ($repo =~ m{^\.*/}) {
421     # $repo is a local path to a git index. We'll also resolve ../foo
422     # to ../foo/.git if the latter is a directory.
423     if (-d "$repo/.git") {
424       $repo = "$repo/.git";
425     }
426     $repo_location = 'local';
427     Log(undef, "Using local repository '$repo'");
428   } else {
429     # $repo is none of the above. It must be the name of a hosted
430     # repository.
431     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
432       'filters' => [['name','=',$repo]]
433         )->{'items'};
434     my $n_found = scalar @{$arv_repo_list};
435     if ($n_found > 0) {
436       Log(undef, "Repository '$repo' -> "
437           . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
438     }
439     if ($n_found != 1) {
440       croak("Error: Found $n_found repositories with name '$repo'.");
441     }
442     $repo = $arv_repo_list->[0]->{'fetch_url'};
443     $repo_location = 'remote';
444   }
445   Log(undef, "Using $repo_location repository '$repo'");
446   $ENV{"CRUNCH_SRC_URL"} = $repo;
447
448   # Resolve given script_version (we'll call that $treeish here) to a
449   # commit sha1 ($commit).
450   my $treeish = $Job->{'script_version'};
451   my $commit;
452   if ($repo_location eq 'remote') {
453     # We minimize excess object-fetching by re-using the same bare
454     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
455     # just keep adding remotes to it as needed.
456     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
457     my $gitcmd = "git --git-dir=\Q$local_repo\E";
458
459     # Set up our local repo for caching remote objects, making
460     # archives, etc.
461     if (!-d $local_repo) {
462       make_path($local_repo) or croak("Error: could not create $local_repo");
463     }
464     # This works (exits 0 and doesn't delete fetched objects) even
465     # if $local_repo is already initialized:
466     `$gitcmd init --bare`;
467     if ($?) {
468       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
469     }
470
471     # If $treeish looks like a hash (or abbrev hash) we look it up in
472     # our local cache first, since that's cheaper. (We don't want to
473     # do that with tags/branches though -- those change over time, so
474     # they should always be resolved by the remote repo.)
475     if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
476       # Hide stderr because it's normal for this to fail:
477       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
478       if ($? == 0 &&
479           $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
480           $sha1 =~ /^([0-9a-f]{40})$/s) {
481         $commit = $1;
482         Log(undef, "Commit $commit already present in $local_repo");
483       }
484     }
485
486     if (!defined $commit) {
487       # If $treeish isn't just a hash or abbrev hash, or isn't here
488       # yet, we need to fetch the remote to resolve it correctly.
489
490       # First, remove all local heads. This prevents a name that does
491       # not exist on the remote from resolving to (or colliding with)
492       # a previously fetched branch or tag (possibly from a different
493       # remote).
494       remove_tree("$local_repo/refs/heads", {keep_root => 1});
495
496       Log(undef, "Fetching objects from $repo to $local_repo");
497       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
498       if ($?) {
499         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
500       }
501     }
502
503     # Now that the data is all here, we will use our local repo for
504     # the rest of our git activities.
505     $repo = $local_repo;
506   }
507
508   my $gitcmd = "git --git-dir=\Q$repo\E";
509   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
510   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
511     croak("`$gitcmd rev-list` exited "
512           .exit_status_s($?)
513           .", '$treeish' not found. Giving up.");
514   }
515   $commit = $1;
516   Log(undef, "Version $treeish is commit $commit");
517
518   if ($commit ne $Job->{'script_version'}) {
519     # Record the real commit id in the database, frozentokey, logs,
520     # etc. -- instead of an abbreviation or a branch name which can
521     # become ambiguous or point to a different commit in the future.
522     if (!$Job->update_attributes('script_version' => $commit)) {
523       croak("Error: failed to update job's script_version attribute");
524     }
525   }
526
527   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
528   $git_archive = `$gitcmd archive ''\Q$commit\E`;
529   if ($?) {
530     croak("Error: $gitcmd archive exited ".exit_status_s($?));
531   }
532 }
533
534 if (!defined $git_archive) {
535   Log(undef, "Skip install phase (no git archive)");
536   if ($have_slurm) {
537     Log(undef, "Warning: This probably means workers have no source tree!");
538   }
539 }
540 else {
541   Log(undef, "Run install script on all workers");
542
543   my @srunargs = ("srun",
544                   "--nodelist=$nodelist",
545                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
546   my @execargs = ("sh", "-c",
547                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
548
549   # Note: this section is almost certainly unnecessary if we're
550   # running tasks in docker containers.
551   my $installpid = fork();
552   if ($installpid == 0)
553   {
554     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
555     exit (1);
556   }
557   while (1)
558   {
559     last if $installpid == waitpid (-1, WNOHANG);
560     freeze_if_want_freeze ($installpid);
561     select (undef, undef, undef, 0.1);
562   }
563   Log (undef, "Install script exited ".exit_status_s($?));
564 }
565
566 if (!$have_slurm)
567 {
568   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
569   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
570 }
571
572 # If this job requires a Docker image, install that.
573 my $docker_bin = "/usr/bin/docker.io";
574 my ($docker_locator, $docker_stream, $docker_hash);
575 if ($docker_locator = $Job->{docker_image_locator}) {
576   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
577   if (!$docker_hash)
578   {
579     croak("No Docker image hash found from locator $docker_locator");
580   }
581   $docker_stream =~ s/^\.//;
582   my $docker_install_script = qq{
583 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
584     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
585 fi
586 };
587   my $docker_pid = fork();
588   if ($docker_pid == 0)
589   {
590     srun (["srun", "--nodelist=" . join(',', @node)],
591           ["/bin/sh", "-ec", $docker_install_script]);
592     exit ($?);
593   }
594   while (1)
595   {
596     last if $docker_pid == waitpid (-1, WNOHANG);
597     freeze_if_want_freeze ($docker_pid);
598     select (undef, undef, undef, 0.1);
599   }
600   if ($? != 0)
601   {
602     croak("Installing Docker image from $docker_locator exited "
603           .exit_status_s($?));
604   }
605 }
606
607 foreach (qw (script script_version script_parameters runtime_constraints))
608 {
609   Log (undef,
610        "$_ " .
611        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
612 }
613 foreach (split (/\n/, $Job->{knobs}))
614 {
615   Log (undef, "knob " . $_);
616 }
617
618
619
620 $main::success = undef;
621
622
623
624 ONELEVEL:
625
626 my $thisround_succeeded = 0;
627 my $thisround_failed = 0;
628 my $thisround_failed_multiple = 0;
629
630 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
631                        or $a <=> $b } @jobstep_todo;
632 my $level = $jobstep[$jobstep_todo[0]]->{level};
633 Log (undef, "start level $level");
634
635
636
637 my %proc;
638 my @freeslot = (0..$#slot);
639 my @holdslot;
640 my %reader;
641 my $progress_is_dirty = 1;
642 my $progress_stats_updated = 0;
643
644 update_progress_stats();
645
646
647
648 THISROUND:
649 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
650 {
651   my $id = $jobstep_todo[$todo_ptr];
652   my $Jobstep = $jobstep[$id];
653   if ($Jobstep->{level} != $level)
654   {
655     next;
656   }
657
658   pipe $reader{$id}, "writer" or croak ($!);
659   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
660   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
661
662   my $childslot = $freeslot[0];
663   my $childnode = $slot[$childslot]->{node};
664   my $childslotname = join (".",
665                             $slot[$childslot]->{node}->{name},
666                             $slot[$childslot]->{cpu});
667   my $childpid = fork();
668   if ($childpid == 0)
669   {
670     $SIG{'INT'} = 'DEFAULT';
671     $SIG{'QUIT'} = 'DEFAULT';
672     $SIG{'TERM'} = 'DEFAULT';
673
674     foreach (values (%reader))
675     {
676       close($_);
677     }
678     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
679     open(STDOUT,">&writer");
680     open(STDERR,">&writer");
681
682     undef $dbh;
683     undef $sth;
684
685     delete $ENV{"GNUPGHOME"};
686     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
687     $ENV{"TASK_QSEQUENCE"} = $id;
688     $ENV{"TASK_SEQUENCE"} = $level;
689     $ENV{"JOB_SCRIPT"} = $Job->{script};
690     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
691       $param =~ tr/a-z/A-Z/;
692       $ENV{"JOB_PARAMETER_$param"} = $value;
693     }
694     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
695     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
696     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
697     $ENV{"HOME"} = $ENV{"TASK_WORK"};
698     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
699     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
700     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
701     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
702
703     $ENV{"GZIP"} = "-n";
704
705     my @srunargs = (
706       "srun",
707       "--nodelist=".$childnode->{name},
708       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
709       "--job-name=$job_id.$id.$$",
710         );
711     my $build_script_to_send = "";
712     my $command =
713         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
714         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
715         ."&& cd $ENV{CRUNCH_TMP} ";
716     if ($build_script)
717     {
718       $build_script_to_send = $build_script;
719       $command .=
720           "&& perl -";
721     }
722     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
723     if ($docker_hash)
724     {
725       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
726       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
727       # Dynamically configure the container to use the host system as its
728       # DNS server.  Get the host's global addresses from the ip command,
729       # and turn them into docker --dns options using gawk.
730       $command .=
731           q{$(ip -o address show scope global |
732               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
733       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
734       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
735       $command .= "--env=\QHOME=/home/crunch\E ";
736       while (my ($env_key, $env_val) = each %ENV)
737       {
738         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
739           if ($env_key eq "TASK_WORK") {
740             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
741           }
742           elsif ($env_key eq "TASK_KEEPMOUNT") {
743             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
744           }
745           else {
746             $command .= "--env=\Q$env_key=$env_val\E ";
747           }
748         }
749       }
750       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
751       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
752       $command .= "\Q$docker_hash\E ";
753       $command .= "stdbuf --output=0 --error=0 ";
754       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
755     } else {
756       # Non-docker run
757       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
758       $command .= "stdbuf --output=0 --error=0 ";
759       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
760     }
761
762     my @execargs = ('bash', '-c', $command);
763     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
764     # exec() failed, we assume nothing happened.
765     Log(undef, "srun() failed on build script");
766     die;
767   }
768   close("writer");
769   if (!defined $childpid)
770   {
771     close $reader{$id};
772     delete $reader{$id};
773     next;
774   }
775   shift @freeslot;
776   $proc{$childpid} = { jobstep => $id,
777                        time => time,
778                        slot => $childslot,
779                        jobstepname => "$job_id.$id.$childpid",
780                      };
781   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
782   $slot[$childslot]->{pid} = $childpid;
783
784   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
785   Log ($id, "child $childpid started on $childslotname");
786   $Jobstep->{starttime} = time;
787   $Jobstep->{node} = $childnode->{name};
788   $Jobstep->{slotindex} = $childslot;
789   delete $Jobstep->{stderr};
790   delete $Jobstep->{finishtime};
791
792   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
793   $Jobstep->{'arvados_task'}->save;
794
795   splice @jobstep_todo, $todo_ptr, 1;
796   --$todo_ptr;
797
798   $progress_is_dirty = 1;
799
800   while (!@freeslot
801          ||
802          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
803   {
804     last THISROUND if $main::please_freeze;
805     if ($main::please_info)
806     {
807       $main::please_info = 0;
808       freeze();
809       collate_output();
810       save_meta(1);
811       update_progress_stats();
812     }
813     my $gotsome
814         = readfrompipes ()
815         + reapchildren ();
816     if (!$gotsome)
817     {
818       check_refresh_wanted();
819       check_squeue();
820       update_progress_stats();
821       select (undef, undef, undef, 0.1);
822     }
823     elsif (time - $progress_stats_updated >= 30)
824     {
825       update_progress_stats();
826     }
827     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
828         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
829     {
830       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
831           .($thisround_failed+$thisround_succeeded)
832           .") -- giving up on this round";
833       Log (undef, $message);
834       last THISROUND;
835     }
836
837     # move slots from freeslot to holdslot (or back to freeslot) if necessary
838     for (my $i=$#freeslot; $i>=0; $i--) {
839       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
840         push @holdslot, (splice @freeslot, $i, 1);
841       }
842     }
843     for (my $i=$#holdslot; $i>=0; $i--) {
844       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
845         push @freeslot, (splice @holdslot, $i, 1);
846       }
847     }
848
849     # give up if no nodes are succeeding
850     if (!grep { $_->{node}->{losing_streak} == 0 &&
851                     $_->{node}->{hold_count} < 4 } @slot) {
852       my $message = "Every node has failed -- giving up on this round";
853       Log (undef, $message);
854       last THISROUND;
855     }
856   }
857 }
858
859
860 push @freeslot, splice @holdslot;
861 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
862
863
864 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
865 while (%proc)
866 {
867   if ($main::please_continue) {
868     $main::please_continue = 0;
869     goto THISROUND;
870   }
871   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
872   readfrompipes ();
873   if (!reapchildren())
874   {
875     check_refresh_wanted();
876     check_squeue();
877     update_progress_stats();
878     select (undef, undef, undef, 0.1);
879     killem (keys %proc) if $main::please_freeze;
880   }
881 }
882
883 update_progress_stats();
884 freeze_if_want_freeze();
885
886
887 if (!defined $main::success)
888 {
889   if (@jobstep_todo &&
890       $thisround_succeeded == 0 &&
891       ($thisround_failed == 0 || $thisround_failed > 4))
892   {
893     my $message = "stop because $thisround_failed tasks failed and none succeeded";
894     Log (undef, $message);
895     $main::success = 0;
896   }
897   if (!@jobstep_todo)
898   {
899     $main::success = 1;
900   }
901 }
902
903 goto ONELEVEL if !defined $main::success;
904
905
906 release_allocation();
907 freeze();
908 my $collated_output = &collate_output();
909
910 if (!$collated_output) {
911   Log(undef, "output undef");
912 }
913 else {
914   eval {
915     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
916         or die "failed to get collated manifest: $!";
917     my $orig_manifest_text = '';
918     while (my $manifest_line = <$orig_manifest>) {
919       $orig_manifest_text .= $manifest_line;
920     }
921     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
922       'manifest_text' => $orig_manifest_text,
923     });
924     Log(undef, "output uuid " . $output->{uuid});
925     Log(undef, "output hash " . $output->{portable_data_hash});
926     $Job->update_attributes('output' => $output->{portable_data_hash});
927   };
928   if ($@) {
929     Log (undef, "Failed to register output manifest: $@");
930   }
931 }
932
933 Log (undef, "finish");
934
935 save_meta();
936
937 my $final_state;
938 if ($collated_output && $main::success) {
939   $final_state = 'Complete';
940 } else {
941   $final_state = 'Failed';
942 }
943 $Job->update_attributes('state' => $final_state);
944
945 exit (($final_state eq 'Complete') ? 0 : 1);
946
947
948
949 sub update_progress_stats
950 {
951   $progress_stats_updated = time;
952   return if !$progress_is_dirty;
953   my ($todo, $done, $running) = (scalar @jobstep_todo,
954                                  scalar @jobstep_done,
955                                  scalar @slot - scalar @freeslot - scalar @holdslot);
956   $Job->{'tasks_summary'} ||= {};
957   $Job->{'tasks_summary'}->{'todo'} = $todo;
958   $Job->{'tasks_summary'}->{'done'} = $done;
959   $Job->{'tasks_summary'}->{'running'} = $running;
960   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
961   Log (undef, "status: $done done, $running running, $todo todo");
962   $progress_is_dirty = 0;
963 }
964
965
966
967 sub reapchildren
968 {
969   my $pid = waitpid (-1, WNOHANG);
970   return 0 if $pid <= 0;
971
972   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
973                   . "."
974                   . $slot[$proc{$pid}->{slot}]->{cpu});
975   my $jobstepid = $proc{$pid}->{jobstep};
976   my $elapsed = time - $proc{$pid}->{time};
977   my $Jobstep = $jobstep[$jobstepid];
978
979   my $childstatus = $?;
980   my $exitvalue = $childstatus >> 8;
981   my $exitinfo = "exit ".exit_status_s($childstatus);
982   $Jobstep->{'arvados_task'}->reload;
983   my $task_success = $Jobstep->{'arvados_task'}->{success};
984
985   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
986
987   if (!defined $task_success) {
988     # task did not indicate one way or the other --> fail
989     $Jobstep->{'arvados_task'}->{success} = 0;
990     $Jobstep->{'arvados_task'}->save;
991     $task_success = 0;
992   }
993
994   if (!$task_success)
995   {
996     my $temporary_fail;
997     $temporary_fail ||= $Jobstep->{node_fail};
998     $temporary_fail ||= ($exitvalue == 111);
999
1000     ++$thisround_failed;
1001     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1002
1003     # Check for signs of a failed or misconfigured node
1004     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1005         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1006       # Don't count this against jobstep failure thresholds if this
1007       # node is already suspected faulty and srun exited quickly
1008       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1009           $elapsed < 5) {
1010         Log ($jobstepid, "blaming failure on suspect node " .
1011              $slot[$proc{$pid}->{slot}]->{node}->{name});
1012         $temporary_fail ||= 1;
1013       }
1014       ban_node_by_slot($proc{$pid}->{slot});
1015     }
1016
1017     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1018                              ++$Jobstep->{'failures'},
1019                              $temporary_fail ? 'temporary ' : 'permanent',
1020                              $elapsed));
1021
1022     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1023       # Give up on this task, and the whole job
1024       $main::success = 0;
1025       $main::please_freeze = 1;
1026     }
1027     # Put this task back on the todo queue
1028     push @jobstep_todo, $jobstepid;
1029     $Job->{'tasks_summary'}->{'failed'}++;
1030   }
1031   else
1032   {
1033     ++$thisround_succeeded;
1034     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1035     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1036     push @jobstep_done, $jobstepid;
1037     Log ($jobstepid, "success in $elapsed seconds");
1038   }
1039   $Jobstep->{exitcode} = $childstatus;
1040   $Jobstep->{finishtime} = time;
1041   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1042   $Jobstep->{'arvados_task'}->save;
1043   process_stderr ($jobstepid, $task_success);
1044   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1045
1046   close $reader{$jobstepid};
1047   delete $reader{$jobstepid};
1048   delete $slot[$proc{$pid}->{slot}]->{pid};
1049   push @freeslot, $proc{$pid}->{slot};
1050   delete $proc{$pid};
1051
1052   if ($task_success) {
1053     # Load new tasks
1054     my $newtask_list = [];
1055     my $newtask_results;
1056     do {
1057       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1058         'where' => {
1059           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1060         },
1061         'order' => 'qsequence',
1062         'offset' => scalar(@$newtask_list),
1063       );
1064       push(@$newtask_list, @{$newtask_results->{items}});
1065     } while (@{$newtask_results->{items}});
1066     foreach my $arvados_task (@$newtask_list) {
1067       my $jobstep = {
1068         'level' => $arvados_task->{'sequence'},
1069         'failures' => 0,
1070         'arvados_task' => $arvados_task
1071       };
1072       push @jobstep, $jobstep;
1073       push @jobstep_todo, $#jobstep;
1074     }
1075   }
1076
1077   $progress_is_dirty = 1;
1078   1;
1079 }
1080
1081 sub check_refresh_wanted
1082 {
1083   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1084   if (@stat && $stat[9] > $latest_refresh) {
1085     $latest_refresh = scalar time;
1086     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1087     for my $attr ('cancelled_at',
1088                   'cancelled_by_user_uuid',
1089                   'cancelled_by_client_uuid',
1090                   'state') {
1091       $Job->{$attr} = $Job2->{$attr};
1092     }
1093     if ($Job->{'state'} ne "Running") {
1094       if ($Job->{'state'} eq "Cancelled") {
1095         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1096       } else {
1097         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1098       }
1099       $main::success = 0;
1100       $main::please_freeze = 1;
1101     }
1102   }
1103 }
1104
1105 sub check_squeue
1106 {
1107   # return if the kill list was checked <4 seconds ago
1108   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1109   {
1110     return;
1111   }
1112   $squeue_kill_checked = time;
1113
1114   # use killem() on procs whose killtime is reached
1115   for (keys %proc)
1116   {
1117     if (exists $proc{$_}->{killtime}
1118         && $proc{$_}->{killtime} <= time)
1119     {
1120       killem ($_);
1121     }
1122   }
1123
1124   # return if the squeue was checked <60 seconds ago
1125   if (defined $squeue_checked && $squeue_checked > time - 60)
1126   {
1127     return;
1128   }
1129   $squeue_checked = time;
1130
1131   if (!$have_slurm)
1132   {
1133     # here is an opportunity to check for mysterious problems with local procs
1134     return;
1135   }
1136
1137   # get a list of steps still running
1138   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1139   chop @squeue;
1140   if ($squeue[-1] ne "ok")
1141   {
1142     return;
1143   }
1144   pop @squeue;
1145
1146   # which of my jobsteps are running, according to squeue?
1147   my %ok;
1148   foreach (@squeue)
1149   {
1150     if (/^(\d+)\.(\d+) (\S+)/)
1151     {
1152       if ($1 eq $ENV{SLURM_JOBID})
1153       {
1154         $ok{$3} = 1;
1155       }
1156     }
1157   }
1158
1159   # which of my active child procs (>60s old) were not mentioned by squeue?
1160   foreach (keys %proc)
1161   {
1162     if ($proc{$_}->{time} < time - 60
1163         && !exists $ok{$proc{$_}->{jobstepname}}
1164         && !exists $proc{$_}->{killtime})
1165     {
1166       # kill this proc if it hasn't exited in 30 seconds
1167       $proc{$_}->{killtime} = time + 30;
1168     }
1169   }
1170 }
1171
1172
1173 sub release_allocation
1174 {
1175   if ($have_slurm)
1176   {
1177     Log (undef, "release job allocation");
1178     system "scancel $ENV{SLURM_JOBID}";
1179   }
1180 }
1181
1182
1183 sub readfrompipes
1184 {
1185   my $gotsome = 0;
1186   foreach my $job (keys %reader)
1187   {
1188     my $buf;
1189     while (0 < sysread ($reader{$job}, $buf, 8192))
1190     {
1191       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1192       $jobstep[$job]->{stderr} .= $buf;
1193       preprocess_stderr ($job);
1194       if (length ($jobstep[$job]->{stderr}) > 16384)
1195       {
1196         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1197       }
1198       $gotsome = 1;
1199     }
1200   }
1201   return $gotsome;
1202 }
1203
1204
1205 sub preprocess_stderr
1206 {
1207   my $job = shift;
1208
1209   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1210     my $line = $1;
1211     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1212     Log ($job, "stderr $line");
1213     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1214       # whoa.
1215       $main::please_freeze = 1;
1216     }
1217     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1218       $jobstep[$job]->{node_fail} = 1;
1219       ban_node_by_slot($jobstep[$job]->{slotindex});
1220     }
1221   }
1222 }
1223
1224
1225 sub process_stderr
1226 {
1227   my $job = shift;
1228   my $task_success = shift;
1229   preprocess_stderr ($job);
1230
1231   map {
1232     Log ($job, "stderr $_");
1233   } split ("\n", $jobstep[$job]->{stderr});
1234 }
1235
1236 sub fetch_block
1237 {
1238   my $hash = shift;
1239   my ($keep, $child_out, $output_block);
1240
1241   my $cmd = "arv-get \Q$hash\E";
1242   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1243   $output_block = '';
1244   while (1) {
1245     my $buf;
1246     my $bytes = sysread($keep, $buf, 1024 * 1024);
1247     if (!defined $bytes) {
1248       die "reading from arv-get: $!";
1249     } elsif ($bytes == 0) {
1250       # sysread returns 0 at the end of the pipe.
1251       last;
1252     } else {
1253       # some bytes were read into buf.
1254       $output_block .= $buf;
1255     }
1256   }
1257   close $keep;
1258   return $output_block;
1259 }
1260
1261 sub collate_output
1262 {
1263   Log (undef, "collate");
1264
1265   my ($child_out, $child_in);
1266   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1267                   '--retries', put_retry_count());
1268   my $joboutput;
1269   for (@jobstep)
1270   {
1271     next if (!exists $_->{'arvados_task'}->{'output'} ||
1272              !$_->{'arvados_task'}->{'success'});
1273     my $output = $_->{'arvados_task'}->{output};
1274     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1275     {
1276       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1277       print $child_in $output;
1278     }
1279     elsif (@jobstep == 1)
1280     {
1281       $joboutput = $output;
1282       last;
1283     }
1284     elsif (defined (my $outblock = fetch_block ($output)))
1285     {
1286       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1287       print $child_in $outblock;
1288     }
1289     else
1290     {
1291       Log (undef, "XXX fetch_block($output) failed XXX");
1292       $main::success = 0;
1293     }
1294   }
1295   $child_in->close;
1296
1297   if (!defined $joboutput) {
1298     my $s = IO::Select->new($child_out);
1299     if ($s->can_read(120)) {
1300       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1301       chomp($joboutput);
1302       # TODO: Ensure exit status == 0.
1303     } else {
1304       Log (undef, "timed out reading from 'arv-put'");
1305     }
1306   }
1307   # TODO: kill $pid instead of waiting, now that we've decided to
1308   # ignore further output.
1309   waitpid($pid, 0);
1310
1311   return $joboutput;
1312 }
1313
1314
1315 sub killem
1316 {
1317   foreach (@_)
1318   {
1319     my $sig = 2;                # SIGINT first
1320     if (exists $proc{$_}->{"sent_$sig"} &&
1321         time - $proc{$_}->{"sent_$sig"} > 4)
1322     {
1323       $sig = 15;                # SIGTERM if SIGINT doesn't work
1324     }
1325     if (exists $proc{$_}->{"sent_$sig"} &&
1326         time - $proc{$_}->{"sent_$sig"} > 4)
1327     {
1328       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1329     }
1330     if (!exists $proc{$_}->{"sent_$sig"})
1331     {
1332       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1333       kill $sig, $_;
1334       select (undef, undef, undef, 0.1);
1335       if ($sig == 2)
1336       {
1337         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1338       }
1339       $proc{$_}->{"sent_$sig"} = time;
1340       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1341     }
1342   }
1343 }
1344
1345
1346 sub fhbits
1347 {
1348   my($bits);
1349   for (@_) {
1350     vec($bits,fileno($_),1) = 1;
1351   }
1352   $bits;
1353 }
1354
1355
1356 sub Log                         # ($jobstep_id, $logmessage)
1357 {
1358   if ($_[1] =~ /\n/) {
1359     for my $line (split (/\n/, $_[1])) {
1360       Log ($_[0], $line);
1361     }
1362     return;
1363   }
1364   my $fh = select STDERR; $|=1; select $fh;
1365   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1366   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1367   $message .= "\n";
1368   my $datetime;
1369   if ($local_logfile || -t STDERR) {
1370     my @gmtime = gmtime;
1371     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1372                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1373   }
1374   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1375
1376   if ($local_logfile) {
1377     print $local_logfile $datetime . " " . $message;
1378   }
1379 }
1380
1381
1382 sub croak
1383 {
1384   my ($package, $file, $line) = caller;
1385   my $message = "@_ at $file line $line\n";
1386   Log (undef, $message);
1387   freeze() if @jobstep_todo;
1388   collate_output() if @jobstep_todo;
1389   cleanup() if $Job;
1390   save_meta() if $local_logfile;
1391   die;
1392 }
1393
1394
1395 sub cleanup
1396 {
1397   if ($Job->{'state'} eq 'Cancelled') {
1398     $Job->update_attributes('finished_at' => scalar gmtime);
1399   } else {
1400     $Job->update_attributes('state' => 'Failed');
1401   }
1402 }
1403
1404
1405 sub save_meta
1406 {
1407   my $justcheckpoint = shift; # false if this will be the last meta saved
1408   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1409
1410   $local_logfile->flush;
1411   my $retry_count = put_retry_count();
1412   my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1413       "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1414   my $loglocator = `$cmd`;
1415   die "system $cmd exited ".exit_status_s($?) if $?;
1416   chomp($loglocator);
1417
1418   $local_logfile = undef;   # the temp file is automatically deleted
1419   Log (undef, "log manifest is $loglocator");
1420   $Job->{'log'} = $loglocator;
1421   $Job->update_attributes('log', $loglocator);
1422 }
1423
1424
1425 sub freeze_if_want_freeze
1426 {
1427   if ($main::please_freeze)
1428   {
1429     release_allocation();
1430     if (@_)
1431     {
1432       # kill some srun procs before freeze+stop
1433       map { $proc{$_} = {} } @_;
1434       while (%proc)
1435       {
1436         killem (keys %proc);
1437         select (undef, undef, undef, 0.1);
1438         my $died;
1439         while (($died = waitpid (-1, WNOHANG)) > 0)
1440         {
1441           delete $proc{$died};
1442         }
1443       }
1444     }
1445     freeze();
1446     collate_output();
1447     cleanup();
1448     save_meta();
1449     exit 1;
1450   }
1451 }
1452
1453
1454 sub freeze
1455 {
1456   Log (undef, "Freeze not implemented");
1457   return;
1458 }
1459
1460
1461 sub thaw
1462 {
1463   croak ("Thaw not implemented");
1464 }
1465
1466
1467 sub freezequote
1468 {
1469   my $s = shift;
1470   $s =~ s/\\/\\\\/g;
1471   $s =~ s/\n/\\n/g;
1472   return $s;
1473 }
1474
1475
1476 sub freezeunquote
1477 {
1478   my $s = shift;
1479   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1480   return $s;
1481 }
1482
1483
1484 sub srun
1485 {
1486   my $srunargs = shift;
1487   my $execargs = shift;
1488   my $opts = shift || {};
1489   my $stdin = shift;
1490   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1491   print STDERR (join (" ",
1492                       map { / / ? "'$_'" : $_ }
1493                       (@$args)),
1494                 "\n")
1495       if $ENV{CRUNCH_DEBUG};
1496
1497   if (defined $stdin) {
1498     my $child = open STDIN, "-|";
1499     defined $child or die "no fork: $!";
1500     if ($child == 0) {
1501       print $stdin or die $!;
1502       close STDOUT or die $!;
1503       exit 0;
1504     }
1505   }
1506
1507   return system (@$args) if $opts->{fork};
1508
1509   exec @$args;
1510   warn "ENV size is ".length(join(" ",%ENV));
1511   die "exec failed: $!: @$args";
1512 }
1513
1514
1515 sub ban_node_by_slot {
1516   # Don't start any new jobsteps on this node for 60 seconds
1517   my $slotid = shift;
1518   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1519   $slot[$slotid]->{node}->{hold_count}++;
1520   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1521 }
1522
1523 sub must_lock_now
1524 {
1525   my ($lockfile, $error_message) = @_;
1526   open L, ">", $lockfile or croak("$lockfile: $!");
1527   if (!flock L, LOCK_EX|LOCK_NB) {
1528     croak("Can't lock $lockfile: $error_message\n");
1529   }
1530 }
1531
1532 sub find_docker_image {
1533   # Given a Keep locator, check to see if it contains a Docker image.
1534   # If so, return its stream name and Docker hash.
1535   # If not, return undef for both values.
1536   my $locator = shift;
1537   my ($streamname, $filename);
1538   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1539     foreach my $line (split(/\n/, $image->{manifest_text})) {
1540       my @tokens = split(/\s+/, $line);
1541       next if (!@tokens);
1542       $streamname = shift(@tokens);
1543       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1544         if (defined($filename)) {
1545           return (undef, undef);  # More than one file in the Collection.
1546         } else {
1547           $filename = (split(/:/, $filedata, 3))[2];
1548         }
1549       }
1550     }
1551   }
1552   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1553     return ($streamname, $1);
1554   } else {
1555     return (undef, undef);
1556   }
1557 }
1558
1559 sub put_retry_count {
1560   # Calculate a --retries argument for arv-put that will have it try
1561   # approximately as long as this Job has been running.
1562   my $stoptime = shift || time;
1563   my $starttime = $jobstep[0]->{starttime};
1564   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1565   my $retries = 0;
1566   while ($timediff >= 2) {
1567     $retries++;
1568     $timediff /= 2;
1569   }
1570   return ($retries > 3) ? $retries : 3;
1571 }
1572
1573 sub exit_status_s {
1574   # Given a $?, return a human-readable exit code string like "0" or
1575   # "1" or "0 with signal 1" or "1 with signal 11".
1576   my $exitcode = shift;
1577   my $s = $exitcode >> 8;
1578   if ($exitcode & 0x7f) {
1579     $s .= " with signal " . ($exitcode & 0x7f);
1580   }
1581   if ($exitcode & 0x80) {
1582     $s .= " with core dump";
1583   }
1584   return $s;
1585 }
1586
1587 __DATA__
1588 #!/usr/bin/perl
1589
1590 # checkout-and-build
1591
1592 use Fcntl ':flock';
1593 use File::Path qw( make_path );
1594
1595 my $destdir = $ENV{"CRUNCH_SRC"};
1596 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1597 my $repo = $ENV{"CRUNCH_SRC_URL"};
1598 my $task_work = $ENV{"TASK_WORK"};
1599
1600 for my $dir ($destdir, $task_work) {
1601     if ($dir) {
1602         make_path $dir;
1603         -e $dir or die "Failed to create temporary directory ($dir): $!";
1604     }
1605 }
1606
1607 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1608 flock L, LOCK_EX;
1609 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1610     if (@ARGV) {
1611         exec(@ARGV);
1612         die "Cannot exec `@ARGV`: $!";
1613     } else {
1614         exit 0;
1615     }
1616 }
1617
1618 unlink "$destdir.commit";
1619 open STDOUT, ">", "$destdir.log";
1620 open STDERR, ">&STDOUT";
1621
1622 mkdir $destdir;
1623 my @git_archive_data = <DATA>;
1624 if (@git_archive_data) {
1625   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1626   print TARX @git_archive_data;
1627   if(!close(TARX)) {
1628     die "'tar -C $destdir -xf -' exited $?: $!";
1629   }
1630 }
1631
1632 my $pwd;
1633 chomp ($pwd = `pwd`);
1634 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1635 mkdir $install_dir;
1636
1637 for my $src_path ("$destdir/arvados/sdk/python") {
1638   if (-d $src_path) {
1639     shell_or_die ("virtualenv", $install_dir);
1640     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1641   }
1642 }
1643
1644 if (-e "$destdir/crunch_scripts/install") {
1645     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1646 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1647     # Old version
1648     shell_or_die ("./tests/autotests.sh", $install_dir);
1649 } elsif (-e "./install.sh") {
1650     shell_or_die ("./install.sh", $install_dir);
1651 }
1652
1653 if ($commit) {
1654     unlink "$destdir.commit.new";
1655     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1656     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1657 }
1658
1659 close L;
1660
1661 if (@ARGV) {
1662     exec(@ARGV);
1663     die "Cannot exec `@ARGV`: $!";
1664 } else {
1665     exit 0;
1666 }
1667
1668 sub shell_or_die
1669 {
1670   if ($ENV{"DEBUG"}) {
1671     print STDERR "@_\n";
1672   }
1673   system (@_) == 0
1674       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1675 }
1676
1677 __DATA__