3775: Update comment
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Digest::MD5 qw(md5_hex);
90 use Getopt::Long;
91 use IPC::Open2;
92 use IO::Select;
93 use File::Temp;
94 use Fcntl ':flock';
95 use File::Path qw( make_path remove_tree );
96
97 use constant EX_TEMPFAIL => 75;
98
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102   if ($ENV{"USER"} ne "crunch" && $< != 0) {
103     # use a tmp dir unique for my uid
104     $ENV{"CRUNCH_TMP"} .= "-$<";
105   }
106 }
107
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
111 }
112
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
117
118 my $force_unlock;
119 my $git_dir;
120 my $jobspec;
121 my $job_api_token;
122 my $no_clear_tmp;
123 my $resume_stash;
124 GetOptions('force-unlock' => \$force_unlock,
125            'git-dir=s' => \$git_dir,
126            'job=s' => \$jobspec,
127            'job-api-token=s' => \$job_api_token,
128            'no-clear-tmp' => \$no_clear_tmp,
129            'resume-stash=s' => \$resume_stash,
130     );
131
132 if (defined $job_api_token) {
133   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
134 }
135
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
137 my $local_job = 0;
138
139
140 $SIG{'USR1'} = sub
141 {
142   $main::ENV{CRUNCH_DEBUG} = 1;
143 };
144 $SIG{'USR2'} = sub
145 {
146   $main::ENV{CRUNCH_DEBUG} = 0;
147 };
148
149
150
151 my $arv = Arvados->new('apiVersion' => 'v1');
152
153 my $User = $arv->{'users'}->{'current'}->execute;
154
155 my $Job;
156 my $job_id;
157 my $dbh;
158 my $sth;
159 if ($jobspec =~ /^[-a-z\d]+$/)
160 {
161   # $jobspec is an Arvados UUID, not a JSON job specification
162   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
163   if (!$force_unlock) {
164     # Claim this job, and make sure nobody else does
165     eval {
166       # lock() sets is_locked_by_uuid and changes state to Running.
167       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
168     };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
307 my $squeue_checked;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
311
312
313
314 if (defined $Job->{thawedfromkey})
315 {
316   thaw ($Job->{thawedfromkey});
317 }
318 else
319 {
320   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321     'job_uuid' => $Job->{'uuid'},
322     'sequence' => 0,
323     'qsequence' => 0,
324     'parameters' => {},
325                                                           });
326   push @jobstep, { 'level' => 0,
327                    'failures' => 0,
328                    'arvados_task' => $first_task,
329                  };
330   push @jobstep_todo, 0;
331 }
332
333
334 if (!$have_slurm)
335 {
336   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
337 }
338
339
340 my $build_script;
341 do {
342   local $/ = undef;
343   $build_script = <DATA>;
344 };
345 my $nodelist = join(",", @node);
346
347 if (!defined $no_clear_tmp) {
348   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
349   Log (undef, "Clean work dirs");
350
351   my $cleanpid = fork();
352   if ($cleanpid == 0)
353   {
354     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
355           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
356     exit (1);
357   }
358   while (1)
359   {
360     last if $cleanpid == waitpid (-1, WNOHANG);
361     freeze_if_want_freeze ($cleanpid);
362     select (undef, undef, undef, 0.1);
363   }
364   Log (undef, "Cleanup command exited ".exit_status_s($?));
365 }
366
367
368 my $git_archive;
369 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
370   # If script_version looks like an absolute path, *and* the --git-dir
371   # argument was not given -- which implies we were not invoked by
372   # crunch-dispatch -- we will use the given path as a working
373   # directory instead of resolving script_version to a git commit (or
374   # doing anything else with git).
375   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
376   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
377 }
378 else {
379   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
380
381   # Install requested code version
382   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
383
384   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
385
386   # If we're running under crunch-dispatch, it will have already
387   # pulled the appropriate source tree into its own repository, and
388   # given us that repo's path as $git_dir.
389   #
390   # If we're running a "local" job, we might have to fetch content
391   # from a remote repository.
392   #
393   # (Currently crunch-dispatch gives a local path with --git-dir, but
394   # we might as well accept URLs there too in case it changes its
395   # mind.)
396   my $repo = $git_dir || $Job->{'repository'};
397
398   # Repository can be remote or local. If remote, we'll need to fetch it
399   # to a local dir before doing `git log` et al.
400   my $repo_location;
401
402   if ($repo =~ m{://|^[^/]*:}) {
403     # $repo is a git url we can clone, like git:// or https:// or
404     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
405     # not recognized here because distinguishing that from a local
406     # path is too fragile. If you really need something strange here,
407     # use the ssh:// form.
408     $repo_location = 'remote';
409   } elsif ($repo =~ m{^\.*/}) {
410     # $repo is a local path to a git index. We'll also resolve ../foo
411     # to ../foo/.git if the latter is a directory. To help
412     # disambiguate local paths from named hosted repositories, this
413     # form must be given as ./ or ../ if it's a relative path.
414     if (-d "$repo/.git") {
415       $repo = "$repo/.git";
416     }
417     $repo_location = 'local';
418   } else {
419     # $repo is none of the above. It must be the name of a hosted
420     # repository.
421     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
422       'filters' => [['name','=',$repo]]
423         );
424     my @repos_found = @{$arv_repo_list->{'items'}};
425     my $n_found = $arv_repo_list->{'items_available'};
426     if ($n_found > 0) {
427       Log(undef, "Repository '$repo' -> "
428           . join(", ", map { $_->{'uuid'} } @repos_found));
429     }
430     if ($n_found != 1) {
431       croak("Error: Found $n_found repositories with name '$repo'.");
432     }
433     $repo = $repos_found[0]->{'fetch_url'};
434     $repo_location = 'remote';
435   }
436   Log(undef, "Using $repo_location repository '$repo'");
437   $ENV{"CRUNCH_SRC_URL"} = $repo;
438
439   # Resolve given script_version (we'll call that $treeish here) to a
440   # commit sha1 ($commit).
441   my $treeish = $Job->{'script_version'};
442   my $commit;
443   if ($repo_location eq 'remote') {
444     # We minimize excess object-fetching by re-using the same bare
445     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
446     # just keep adding remotes to it as needed.
447     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
448     my $gitcmd = "git --git-dir=\Q$local_repo\E";
449
450     # Set up our local repo for caching remote objects, making
451     # archives, etc.
452     if (!-d $local_repo) {
453       make_path($local_repo) or croak("Error: could not create $local_repo");
454     }
455     # This works (exits 0 and doesn't delete fetched objects) even
456     # if $local_repo is already initialized:
457     `$gitcmd init --bare`;
458     if ($?) {
459       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
460     }
461
462     # If $treeish looks like a hash (or abbrev hash) we look it up in
463     # our local cache first, since that's cheaper. (We don't want to
464     # do that with tags/branches though -- those change over time, so
465     # they should always be resolved by the remote repo.)
466     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
467       # Hide stderr because it's normal for this to fail:
468       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
469       if ($? == 0 &&
470           # Careful not to resolve a branch named abcdeff to commit 1234567:
471           $sha1 =~ /^$treeish/ &&
472           $sha1 =~ /^([0-9a-f]{40})$/s) {
473         $commit = $1;
474         Log(undef, "Commit $commit already present in $local_repo");
475       }
476     }
477
478     if (!defined $commit) {
479       # If $treeish isn't just a hash or abbrev hash, or isn't here
480       # yet, we need to fetch the remote to resolve it correctly.
481
482       # First, remove all local heads. This prevents a name that does
483       # not exist on the remote from resolving to (or colliding with)
484       # a previously fetched branch or tag (possibly from a different
485       # remote).
486       remove_tree("$local_repo/refs/heads", {keep_root => 1});
487
488       Log(undef, "Fetching objects from $repo to $local_repo");
489       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
490       if ($?) {
491         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
492       }
493     }
494
495     # Now that the data is all here, we will use our local repo for
496     # the rest of our git activities.
497     $repo = $local_repo;
498   }
499
500   my $gitcmd = "git --git-dir=\Q$repo\E";
501   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
502   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
503     croak("`$gitcmd rev-list` exited "
504           .exit_status_s($?)
505           .", '$treeish' not found. Giving up.");
506   }
507   $commit = $1;
508   Log(undef, "Version $treeish is commit $commit");
509
510   if ($commit ne $Job->{'script_version'}) {
511     # Record the real commit id in the database, frozentokey, logs,
512     # etc. -- instead of an abbreviation or a branch name which can
513     # become ambiguous or point to a different commit in the future.
514     if (!$Job->update_attributes('script_version' => $commit)) {
515       croak("Error: failed to update job's script_version attribute");
516     }
517   }
518
519   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
520   $git_archive = `$gitcmd archive ''\Q$commit\E`;
521   if ($?) {
522     croak("Error: $gitcmd archive exited ".exit_status_s($?));
523   }
524 }
525
526 if (!defined $git_archive) {
527   Log(undef, "Skip install phase (no git archive)");
528   if ($have_slurm) {
529     Log(undef, "Warning: This probably means workers have no source tree!");
530   }
531 }
532 else {
533   Log(undef, "Run install script on all workers");
534
535   my @srunargs = ("srun",
536                   "--nodelist=$nodelist",
537                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
538   my @execargs = ("sh", "-c",
539                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
540
541   # Note: this section is almost certainly unnecessary if we're
542   # running tasks in docker containers.
543   my $installpid = fork();
544   if ($installpid == 0)
545   {
546     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
547     exit (1);
548   }
549   while (1)
550   {
551     last if $installpid == waitpid (-1, WNOHANG);
552     freeze_if_want_freeze ($installpid);
553     select (undef, undef, undef, 0.1);
554   }
555   Log (undef, "Install script exited ".exit_status_s($?));
556 }
557
558 if (!$have_slurm)
559 {
560   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
561   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
562 }
563
564 # If this job requires a Docker image, install that.
565 my $docker_bin = "/usr/bin/docker.io";
566 my ($docker_locator, $docker_stream, $docker_hash);
567 if ($docker_locator = $Job->{docker_image_locator}) {
568   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
569   if (!$docker_hash)
570   {
571     croak("No Docker image hash found from locator $docker_locator");
572   }
573   $docker_stream =~ s/^\.//;
574   my $docker_install_script = qq{
575 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
576     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
577 fi
578 };
579   my $docker_pid = fork();
580   if ($docker_pid == 0)
581   {
582     srun (["srun", "--nodelist=" . join(',', @node)],
583           ["/bin/sh", "-ec", $docker_install_script]);
584     exit ($?);
585   }
586   while (1)
587   {
588     last if $docker_pid == waitpid (-1, WNOHANG);
589     freeze_if_want_freeze ($docker_pid);
590     select (undef, undef, undef, 0.1);
591   }
592   if ($? != 0)
593   {
594     croak("Installing Docker image from $docker_locator exited "
595           .exit_status_s($?));
596   }
597 }
598
599 foreach (qw (script script_version script_parameters runtime_constraints))
600 {
601   Log (undef,
602        "$_ " .
603        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
604 }
605 foreach (split (/\n/, $Job->{knobs}))
606 {
607   Log (undef, "knob " . $_);
608 }
609
610
611
612 $main::success = undef;
613
614
615
616 ONELEVEL:
617
618 my $thisround_succeeded = 0;
619 my $thisround_failed = 0;
620 my $thisround_failed_multiple = 0;
621
622 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
623                        or $a <=> $b } @jobstep_todo;
624 my $level = $jobstep[$jobstep_todo[0]]->{level};
625 Log (undef, "start level $level");
626
627
628
629 my %proc;
630 my @freeslot = (0..$#slot);
631 my @holdslot;
632 my %reader;
633 my $progress_is_dirty = 1;
634 my $progress_stats_updated = 0;
635
636 update_progress_stats();
637
638
639
640 THISROUND:
641 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
642 {
643   my $id = $jobstep_todo[$todo_ptr];
644   my $Jobstep = $jobstep[$id];
645   if ($Jobstep->{level} != $level)
646   {
647     next;
648   }
649
650   pipe $reader{$id}, "writer" or croak ($!);
651   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
652   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
653
654   my $childslot = $freeslot[0];
655   my $childnode = $slot[$childslot]->{node};
656   my $childslotname = join (".",
657                             $slot[$childslot]->{node}->{name},
658                             $slot[$childslot]->{cpu});
659   my $childpid = fork();
660   if ($childpid == 0)
661   {
662     $SIG{'INT'} = 'DEFAULT';
663     $SIG{'QUIT'} = 'DEFAULT';
664     $SIG{'TERM'} = 'DEFAULT';
665
666     foreach (values (%reader))
667     {
668       close($_);
669     }
670     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
671     open(STDOUT,">&writer");
672     open(STDERR,">&writer");
673
674     undef $dbh;
675     undef $sth;
676
677     delete $ENV{"GNUPGHOME"};
678     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
679     $ENV{"TASK_QSEQUENCE"} = $id;
680     $ENV{"TASK_SEQUENCE"} = $level;
681     $ENV{"JOB_SCRIPT"} = $Job->{script};
682     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
683       $param =~ tr/a-z/A-Z/;
684       $ENV{"JOB_PARAMETER_$param"} = $value;
685     }
686     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
687     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
688     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
689     $ENV{"HOME"} = $ENV{"TASK_WORK"};
690     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
691     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
692     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
693     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
694
695     $ENV{"GZIP"} = "-n";
696
697     my @srunargs = (
698       "srun",
699       "--nodelist=".$childnode->{name},
700       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
701       "--job-name=$job_id.$id.$$",
702         );
703     my $build_script_to_send = "";
704     my $command =
705         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
706         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
707         ."&& cd $ENV{CRUNCH_TMP} ";
708     if ($build_script)
709     {
710       $build_script_to_send = $build_script;
711       $command .=
712           "&& perl -";
713     }
714     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
715     if ($docker_hash)
716     {
717       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
718       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
719       # Dynamically configure the container to use the host system as its
720       # DNS server.  Get the host's global addresses from the ip command,
721       # and turn them into docker --dns options using gawk.
722       $command .=
723           q{$(ip -o address show scope global |
724               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
725       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
726       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
727       $command .= "--env=\QHOME=/home/crunch\E ";
728       while (my ($env_key, $env_val) = each %ENV)
729       {
730         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
731           if ($env_key eq "TASK_WORK") {
732             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
733           }
734           elsif ($env_key eq "TASK_KEEPMOUNT") {
735             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
736           }
737           else {
738             $command .= "--env=\Q$env_key=$env_val\E ";
739           }
740         }
741       }
742       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
743       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
744       $command .= "\Q$docker_hash\E ";
745       $command .= "stdbuf --output=0 --error=0 ";
746       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
747     } else {
748       # Non-docker run
749       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
750       $command .= "stdbuf --output=0 --error=0 ";
751       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
752     }
753
754     my @execargs = ('bash', '-c', $command);
755     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
756     # exec() failed, we assume nothing happened.
757     Log(undef, "srun() failed on build script");
758     die;
759   }
760   close("writer");
761   if (!defined $childpid)
762   {
763     close $reader{$id};
764     delete $reader{$id};
765     next;
766   }
767   shift @freeslot;
768   $proc{$childpid} = { jobstep => $id,
769                        time => time,
770                        slot => $childslot,
771                        jobstepname => "$job_id.$id.$childpid",
772                      };
773   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
774   $slot[$childslot]->{pid} = $childpid;
775
776   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
777   Log ($id, "child $childpid started on $childslotname");
778   $Jobstep->{starttime} = time;
779   $Jobstep->{node} = $childnode->{name};
780   $Jobstep->{slotindex} = $childslot;
781   delete $Jobstep->{stderr};
782   delete $Jobstep->{finishtime};
783
784   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
785   $Jobstep->{'arvados_task'}->save;
786
787   splice @jobstep_todo, $todo_ptr, 1;
788   --$todo_ptr;
789
790   $progress_is_dirty = 1;
791
792   while (!@freeslot
793          ||
794          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
795   {
796     last THISROUND if $main::please_freeze;
797     if ($main::please_info)
798     {
799       $main::please_info = 0;
800       freeze();
801       collate_output();
802       save_meta(1);
803       update_progress_stats();
804     }
805     my $gotsome
806         = readfrompipes ()
807         + reapchildren ();
808     if (!$gotsome)
809     {
810       check_refresh_wanted();
811       check_squeue();
812       update_progress_stats();
813       select (undef, undef, undef, 0.1);
814     }
815     elsif (time - $progress_stats_updated >= 30)
816     {
817       update_progress_stats();
818     }
819     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
820         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
821     {
822       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
823           .($thisround_failed+$thisround_succeeded)
824           .") -- giving up on this round";
825       Log (undef, $message);
826       last THISROUND;
827     }
828
829     # move slots from freeslot to holdslot (or back to freeslot) if necessary
830     for (my $i=$#freeslot; $i>=0; $i--) {
831       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
832         push @holdslot, (splice @freeslot, $i, 1);
833       }
834     }
835     for (my $i=$#holdslot; $i>=0; $i--) {
836       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
837         push @freeslot, (splice @holdslot, $i, 1);
838       }
839     }
840
841     # give up if no nodes are succeeding
842     if (!grep { $_->{node}->{losing_streak} == 0 &&
843                     $_->{node}->{hold_count} < 4 } @slot) {
844       my $message = "Every node has failed -- giving up on this round";
845       Log (undef, $message);
846       last THISROUND;
847     }
848   }
849 }
850
851
852 push @freeslot, splice @holdslot;
853 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
854
855
856 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
857 while (%proc)
858 {
859   if ($main::please_continue) {
860     $main::please_continue = 0;
861     goto THISROUND;
862   }
863   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
864   readfrompipes ();
865   if (!reapchildren())
866   {
867     check_refresh_wanted();
868     check_squeue();
869     update_progress_stats();
870     select (undef, undef, undef, 0.1);
871     killem (keys %proc) if $main::please_freeze;
872   }
873 }
874
875 update_progress_stats();
876 freeze_if_want_freeze();
877
878
879 if (!defined $main::success)
880 {
881   if (@jobstep_todo &&
882       $thisround_succeeded == 0 &&
883       ($thisround_failed == 0 || $thisround_failed > 4))
884   {
885     my $message = "stop because $thisround_failed tasks failed and none succeeded";
886     Log (undef, $message);
887     $main::success = 0;
888   }
889   if (!@jobstep_todo)
890   {
891     $main::success = 1;
892   }
893 }
894
895 goto ONELEVEL if !defined $main::success;
896
897
898 release_allocation();
899 freeze();
900 my $collated_output = &collate_output();
901
902 if (!$collated_output) {
903   Log(undef, "output undef");
904 }
905 else {
906   eval {
907     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
908         or die "failed to get collated manifest: $!";
909     my $orig_manifest_text = '';
910     while (my $manifest_line = <$orig_manifest>) {
911       $orig_manifest_text .= $manifest_line;
912     }
913     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
914       'manifest_text' => $orig_manifest_text,
915     });
916     Log(undef, "output uuid " . $output->{uuid});
917     Log(undef, "output hash " . $output->{portable_data_hash});
918     $Job->update_attributes('output' => $output->{portable_data_hash});
919   };
920   if ($@) {
921     Log (undef, "Failed to register output manifest: $@");
922   }
923 }
924
925 Log (undef, "finish");
926
927 save_meta();
928
929 my $final_state;
930 if ($collated_output && $main::success) {
931   $final_state = 'Complete';
932 } else {
933   $final_state = 'Failed';
934 }
935 $Job->update_attributes('state' => $final_state);
936
937 exit (($final_state eq 'Complete') ? 0 : 1);
938
939
940
941 sub update_progress_stats
942 {
943   $progress_stats_updated = time;
944   return if !$progress_is_dirty;
945   my ($todo, $done, $running) = (scalar @jobstep_todo,
946                                  scalar @jobstep_done,
947                                  scalar @slot - scalar @freeslot - scalar @holdslot);
948   $Job->{'tasks_summary'} ||= {};
949   $Job->{'tasks_summary'}->{'todo'} = $todo;
950   $Job->{'tasks_summary'}->{'done'} = $done;
951   $Job->{'tasks_summary'}->{'running'} = $running;
952   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
953   Log (undef, "status: $done done, $running running, $todo todo");
954   $progress_is_dirty = 0;
955 }
956
957
958
959 sub reapchildren
960 {
961   my $pid = waitpid (-1, WNOHANG);
962   return 0 if $pid <= 0;
963
964   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
965                   . "."
966                   . $slot[$proc{$pid}->{slot}]->{cpu});
967   my $jobstepid = $proc{$pid}->{jobstep};
968   my $elapsed = time - $proc{$pid}->{time};
969   my $Jobstep = $jobstep[$jobstepid];
970
971   my $childstatus = $?;
972   my $exitvalue = $childstatus >> 8;
973   my $exitinfo = "exit ".exit_status_s($childstatus);
974   $Jobstep->{'arvados_task'}->reload;
975   my $task_success = $Jobstep->{'arvados_task'}->{success};
976
977   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
978
979   if (!defined $task_success) {
980     # task did not indicate one way or the other --> fail
981     $Jobstep->{'arvados_task'}->{success} = 0;
982     $Jobstep->{'arvados_task'}->save;
983     $task_success = 0;
984   }
985
986   if (!$task_success)
987   {
988     my $temporary_fail;
989     $temporary_fail ||= $Jobstep->{node_fail};
990     $temporary_fail ||= ($exitvalue == 111);
991
992     ++$thisround_failed;
993     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
994
995     # Check for signs of a failed or misconfigured node
996     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
997         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
998       # Don't count this against jobstep failure thresholds if this
999       # node is already suspected faulty and srun exited quickly
1000       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1001           $elapsed < 5) {
1002         Log ($jobstepid, "blaming failure on suspect node " .
1003              $slot[$proc{$pid}->{slot}]->{node}->{name});
1004         $temporary_fail ||= 1;
1005       }
1006       ban_node_by_slot($proc{$pid}->{slot});
1007     }
1008
1009     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1010                              ++$Jobstep->{'failures'},
1011                              $temporary_fail ? 'temporary ' : 'permanent',
1012                              $elapsed));
1013
1014     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1015       # Give up on this task, and the whole job
1016       $main::success = 0;
1017       $main::please_freeze = 1;
1018     }
1019     # Put this task back on the todo queue
1020     push @jobstep_todo, $jobstepid;
1021     $Job->{'tasks_summary'}->{'failed'}++;
1022   }
1023   else
1024   {
1025     ++$thisround_succeeded;
1026     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1027     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1028     push @jobstep_done, $jobstepid;
1029     Log ($jobstepid, "success in $elapsed seconds");
1030   }
1031   $Jobstep->{exitcode} = $childstatus;
1032   $Jobstep->{finishtime} = time;
1033   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1034   $Jobstep->{'arvados_task'}->save;
1035   process_stderr ($jobstepid, $task_success);
1036   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1037
1038   close $reader{$jobstepid};
1039   delete $reader{$jobstepid};
1040   delete $slot[$proc{$pid}->{slot}]->{pid};
1041   push @freeslot, $proc{$pid}->{slot};
1042   delete $proc{$pid};
1043
1044   if ($task_success) {
1045     # Load new tasks
1046     my $newtask_list = [];
1047     my $newtask_results;
1048     do {
1049       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1050         'where' => {
1051           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1052         },
1053         'order' => 'qsequence',
1054         'offset' => scalar(@$newtask_list),
1055       );
1056       push(@$newtask_list, @{$newtask_results->{items}});
1057     } while (@{$newtask_results->{items}});
1058     foreach my $arvados_task (@$newtask_list) {
1059       my $jobstep = {
1060         'level' => $arvados_task->{'sequence'},
1061         'failures' => 0,
1062         'arvados_task' => $arvados_task
1063       };
1064       push @jobstep, $jobstep;
1065       push @jobstep_todo, $#jobstep;
1066     }
1067   }
1068
1069   $progress_is_dirty = 1;
1070   1;
1071 }
1072
1073 sub check_refresh_wanted
1074 {
1075   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1076   if (@stat && $stat[9] > $latest_refresh) {
1077     $latest_refresh = scalar time;
1078     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1079     for my $attr ('cancelled_at',
1080                   'cancelled_by_user_uuid',
1081                   'cancelled_by_client_uuid',
1082                   'state') {
1083       $Job->{$attr} = $Job2->{$attr};
1084     }
1085     if ($Job->{'state'} ne "Running") {
1086       if ($Job->{'state'} eq "Cancelled") {
1087         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1088       } else {
1089         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1090       }
1091       $main::success = 0;
1092       $main::please_freeze = 1;
1093     }
1094   }
1095 }
1096
1097 sub check_squeue
1098 {
1099   # return if the kill list was checked <4 seconds ago
1100   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1101   {
1102     return;
1103   }
1104   $squeue_kill_checked = time;
1105
1106   # use killem() on procs whose killtime is reached
1107   for (keys %proc)
1108   {
1109     if (exists $proc{$_}->{killtime}
1110         && $proc{$_}->{killtime} <= time)
1111     {
1112       killem ($_);
1113     }
1114   }
1115
1116   # return if the squeue was checked <60 seconds ago
1117   if (defined $squeue_checked && $squeue_checked > time - 60)
1118   {
1119     return;
1120   }
1121   $squeue_checked = time;
1122
1123   if (!$have_slurm)
1124   {
1125     # here is an opportunity to check for mysterious problems with local procs
1126     return;
1127   }
1128
1129   # get a list of steps still running
1130   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1131   chop @squeue;
1132   if ($squeue[-1] ne "ok")
1133   {
1134     return;
1135   }
1136   pop @squeue;
1137
1138   # which of my jobsteps are running, according to squeue?
1139   my %ok;
1140   foreach (@squeue)
1141   {
1142     if (/^(\d+)\.(\d+) (\S+)/)
1143     {
1144       if ($1 eq $ENV{SLURM_JOBID})
1145       {
1146         $ok{$3} = 1;
1147       }
1148     }
1149   }
1150
1151   # which of my active child procs (>60s old) were not mentioned by squeue?
1152   foreach (keys %proc)
1153   {
1154     if ($proc{$_}->{time} < time - 60
1155         && !exists $ok{$proc{$_}->{jobstepname}}
1156         && !exists $proc{$_}->{killtime})
1157     {
1158       # kill this proc if it hasn't exited in 30 seconds
1159       $proc{$_}->{killtime} = time + 30;
1160     }
1161   }
1162 }
1163
1164
1165 sub release_allocation
1166 {
1167   if ($have_slurm)
1168   {
1169     Log (undef, "release job allocation");
1170     system "scancel $ENV{SLURM_JOBID}";
1171   }
1172 }
1173
1174
1175 sub readfrompipes
1176 {
1177   my $gotsome = 0;
1178   foreach my $job (keys %reader)
1179   {
1180     my $buf;
1181     while (0 < sysread ($reader{$job}, $buf, 8192))
1182     {
1183       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1184       $jobstep[$job]->{stderr} .= $buf;
1185       preprocess_stderr ($job);
1186       if (length ($jobstep[$job]->{stderr}) > 16384)
1187       {
1188         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1189       }
1190       $gotsome = 1;
1191     }
1192   }
1193   return $gotsome;
1194 }
1195
1196
1197 sub preprocess_stderr
1198 {
1199   my $job = shift;
1200
1201   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1202     my $line = $1;
1203     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1204     Log ($job, "stderr $line");
1205     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1206       # whoa.
1207       $main::please_freeze = 1;
1208     }
1209     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1210       $jobstep[$job]->{node_fail} = 1;
1211       ban_node_by_slot($jobstep[$job]->{slotindex});
1212     }
1213   }
1214 }
1215
1216
1217 sub process_stderr
1218 {
1219   my $job = shift;
1220   my $task_success = shift;
1221   preprocess_stderr ($job);
1222
1223   map {
1224     Log ($job, "stderr $_");
1225   } split ("\n", $jobstep[$job]->{stderr});
1226 }
1227
1228 sub fetch_block
1229 {
1230   my $hash = shift;
1231   my ($keep, $child_out, $output_block);
1232
1233   my $cmd = "arv-get \Q$hash\E";
1234   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1235   $output_block = '';
1236   while (1) {
1237     my $buf;
1238     my $bytes = sysread($keep, $buf, 1024 * 1024);
1239     if (!defined $bytes) {
1240       die "reading from arv-get: $!";
1241     } elsif ($bytes == 0) {
1242       # sysread returns 0 at the end of the pipe.
1243       last;
1244     } else {
1245       # some bytes were read into buf.
1246       $output_block .= $buf;
1247     }
1248   }
1249   close $keep;
1250   return $output_block;
1251 }
1252
1253 sub collate_output
1254 {
1255   Log (undef, "collate");
1256
1257   my ($child_out, $child_in);
1258   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1259                   '--retries', put_retry_count());
1260   my $joboutput;
1261   for (@jobstep)
1262   {
1263     next if (!exists $_->{'arvados_task'}->{'output'} ||
1264              !$_->{'arvados_task'}->{'success'});
1265     my $output = $_->{'arvados_task'}->{output};
1266     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1267     {
1268       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1269       print $child_in $output;
1270     }
1271     elsif (@jobstep == 1)
1272     {
1273       $joboutput = $output;
1274       last;
1275     }
1276     elsif (defined (my $outblock = fetch_block ($output)))
1277     {
1278       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1279       print $child_in $outblock;
1280     }
1281     else
1282     {
1283       Log (undef, "XXX fetch_block($output) failed XXX");
1284       $main::success = 0;
1285     }
1286   }
1287   $child_in->close;
1288
1289   if (!defined $joboutput) {
1290     my $s = IO::Select->new($child_out);
1291     if ($s->can_read(120)) {
1292       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1293       chomp($joboutput);
1294       # TODO: Ensure exit status == 0.
1295     } else {
1296       Log (undef, "timed out reading from 'arv-put'");
1297     }
1298   }
1299   # TODO: kill $pid instead of waiting, now that we've decided to
1300   # ignore further output.
1301   waitpid($pid, 0);
1302
1303   return $joboutput;
1304 }
1305
1306
1307 sub killem
1308 {
1309   foreach (@_)
1310   {
1311     my $sig = 2;                # SIGINT first
1312     if (exists $proc{$_}->{"sent_$sig"} &&
1313         time - $proc{$_}->{"sent_$sig"} > 4)
1314     {
1315       $sig = 15;                # SIGTERM if SIGINT doesn't work
1316     }
1317     if (exists $proc{$_}->{"sent_$sig"} &&
1318         time - $proc{$_}->{"sent_$sig"} > 4)
1319     {
1320       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1321     }
1322     if (!exists $proc{$_}->{"sent_$sig"})
1323     {
1324       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1325       kill $sig, $_;
1326       select (undef, undef, undef, 0.1);
1327       if ($sig == 2)
1328       {
1329         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1330       }
1331       $proc{$_}->{"sent_$sig"} = time;
1332       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1333     }
1334   }
1335 }
1336
1337
1338 sub fhbits
1339 {
1340   my($bits);
1341   for (@_) {
1342     vec($bits,fileno($_),1) = 1;
1343   }
1344   $bits;
1345 }
1346
1347
1348 # Send log output to Keep via arv-put.
1349 #
1350 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1351 # $log_pipe_pid is the pid of the arv-put subprocess.
1352 #
1353 # The only functions that should access these variables directly are:
1354 #
1355 # log_writer_start($logfilename)
1356 #     Starts an arv-put pipe, reading data on stdin and writing it to
1357 #     a $logfilename file in an output collection.
1358 #
1359 # log_writer_send($txt)
1360 #     Writes $txt to the output log collection.
1361 #
1362 # log_writer_finish()
1363 #     Closes the arv-put pipe and returns the output that it produces.
1364 #
1365 # log_writer_is_active()
1366 #     Returns a true value if there is currently a live arv-put
1367 #     process, false otherwise.
1368 #
1369 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1370
1371 sub log_writer_start($)
1372 {
1373   my $logfilename = shift;
1374   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1375                         'arv-put', '--portable-data-hash',
1376                         '--retries', '3',
1377                         '--filename', $logfilename,
1378                         '-');
1379 }
1380
1381 sub log_writer_send($)
1382 {
1383   my $txt = shift;
1384   print $log_pipe_in $txt;
1385 }
1386
1387 sub log_writer_finish()
1388 {
1389   return unless $log_pipe_pid;
1390
1391   close($log_pipe_in);
1392   my $arv_put_output;
1393
1394   my $s = IO::Select->new($log_pipe_out);
1395   if ($s->can_read(120)) {
1396     sysread($log_pipe_out, $arv_put_output, 1024);
1397     chomp($arv_put_output);
1398   } else {
1399     Log (undef, "timed out reading from 'arv-put'");
1400   }
1401
1402   waitpid($log_pipe_pid, 0);
1403   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1404   if ($?) {
1405     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1406   }
1407
1408   return $arv_put_output;
1409 }
1410
1411 sub log_writer_is_active() {
1412   return $log_pipe_pid;
1413 }
1414
1415 sub Log                         # ($jobstep_id, $logmessage)
1416 {
1417   if ($_[1] =~ /\n/) {
1418     for my $line (split (/\n/, $_[1])) {
1419       Log ($_[0], $line);
1420     }
1421     return;
1422   }
1423   my $fh = select STDERR; $|=1; select $fh;
1424   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1425   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1426   $message .= "\n";
1427   my $datetime;
1428   if (log_writer_is_active() || -t STDERR) {
1429     my @gmtime = gmtime;
1430     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1431                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1432   }
1433   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1434
1435   if (log_writer_is_active()) {
1436     log_writer_send($datetime . " " . $message);
1437   }
1438 }
1439
1440
1441 sub croak
1442 {
1443   my ($package, $file, $line) = caller;
1444   my $message = "@_ at $file line $line\n";
1445   Log (undef, $message);
1446   freeze() if @jobstep_todo;
1447   collate_output() if @jobstep_todo;
1448   cleanup();
1449   save_meta();
1450   die;
1451 }
1452
1453
1454 sub cleanup
1455 {
1456   return unless $Job;
1457   if ($Job->{'state'} eq 'Cancelled') {
1458     $Job->update_attributes('finished_at' => scalar gmtime);
1459   } else {
1460     $Job->update_attributes('state' => 'Failed');
1461   }
1462 }
1463
1464
1465 sub save_meta
1466 {
1467   my $justcheckpoint = shift; # false if this will be the last meta saved
1468   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1469   return unless log_writer_is_active();
1470
1471   my $loglocator = log_writer_finish();
1472   Log (undef, "log manifest is $loglocator");
1473   $Job->{'log'} = $loglocator;
1474   $Job->update_attributes('log', $loglocator);
1475 }
1476
1477
1478 sub freeze_if_want_freeze
1479 {
1480   if ($main::please_freeze)
1481   {
1482     release_allocation();
1483     if (@_)
1484     {
1485       # kill some srun procs before freeze+stop
1486       map { $proc{$_} = {} } @_;
1487       while (%proc)
1488       {
1489         killem (keys %proc);
1490         select (undef, undef, undef, 0.1);
1491         my $died;
1492         while (($died = waitpid (-1, WNOHANG)) > 0)
1493         {
1494           delete $proc{$died};
1495         }
1496       }
1497     }
1498     freeze();
1499     collate_output();
1500     cleanup();
1501     save_meta();
1502     exit 1;
1503   }
1504 }
1505
1506
1507 sub freeze
1508 {
1509   Log (undef, "Freeze not implemented");
1510   return;
1511 }
1512
1513
1514 sub thaw
1515 {
1516   croak ("Thaw not implemented");
1517 }
1518
1519
1520 sub freezequote
1521 {
1522   my $s = shift;
1523   $s =~ s/\\/\\\\/g;
1524   $s =~ s/\n/\\n/g;
1525   return $s;
1526 }
1527
1528
1529 sub freezeunquote
1530 {
1531   my $s = shift;
1532   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1533   return $s;
1534 }
1535
1536
1537 sub srun
1538 {
1539   my $srunargs = shift;
1540   my $execargs = shift;
1541   my $opts = shift || {};
1542   my $stdin = shift;
1543   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1544   print STDERR (join (" ",
1545                       map { / / ? "'$_'" : $_ }
1546                       (@$args)),
1547                 "\n")
1548       if $ENV{CRUNCH_DEBUG};
1549
1550   if (defined $stdin) {
1551     my $child = open STDIN, "-|";
1552     defined $child or die "no fork: $!";
1553     if ($child == 0) {
1554       print $stdin or die $!;
1555       close STDOUT or die $!;
1556       exit 0;
1557     }
1558   }
1559
1560   return system (@$args) if $opts->{fork};
1561
1562   exec @$args;
1563   warn "ENV size is ".length(join(" ",%ENV));
1564   die "exec failed: $!: @$args";
1565 }
1566
1567
1568 sub ban_node_by_slot {
1569   # Don't start any new jobsteps on this node for 60 seconds
1570   my $slotid = shift;
1571   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1572   $slot[$slotid]->{node}->{hold_count}++;
1573   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1574 }
1575
1576 sub must_lock_now
1577 {
1578   my ($lockfile, $error_message) = @_;
1579   open L, ">", $lockfile or croak("$lockfile: $!");
1580   if (!flock L, LOCK_EX|LOCK_NB) {
1581     croak("Can't lock $lockfile: $error_message\n");
1582   }
1583 }
1584
1585 sub find_docker_image {
1586   # Given a Keep locator, check to see if it contains a Docker image.
1587   # If so, return its stream name and Docker hash.
1588   # If not, return undef for both values.
1589   my $locator = shift;
1590   my ($streamname, $filename);
1591   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1592     foreach my $line (split(/\n/, $image->{manifest_text})) {
1593       my @tokens = split(/\s+/, $line);
1594       next if (!@tokens);
1595       $streamname = shift(@tokens);
1596       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1597         if (defined($filename)) {
1598           return (undef, undef);  # More than one file in the Collection.
1599         } else {
1600           $filename = (split(/:/, $filedata, 3))[2];
1601         }
1602       }
1603     }
1604   }
1605   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1606     return ($streamname, $1);
1607   } else {
1608     return (undef, undef);
1609   }
1610 }
1611
1612 sub put_retry_count {
1613   # Calculate a --retries argument for arv-put that will have it try
1614   # approximately as long as this Job has been running.
1615   my $stoptime = shift || time;
1616   my $starttime = $jobstep[0]->{starttime};
1617   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1618   my $retries = 0;
1619   while ($timediff >= 2) {
1620     $retries++;
1621     $timediff /= 2;
1622   }
1623   return ($retries > 3) ? $retries : 3;
1624 }
1625
1626 sub exit_status_s {
1627   # Given a $?, return a human-readable exit code string like "0" or
1628   # "1" or "0 with signal 1" or "1 with signal 11".
1629   my $exitcode = shift;
1630   my $s = $exitcode >> 8;
1631   if ($exitcode & 0x7f) {
1632     $s .= " with signal " . ($exitcode & 0x7f);
1633   }
1634   if ($exitcode & 0x80) {
1635     $s .= " with core dump";
1636   }
1637   return $s;
1638 }
1639
1640 __DATA__
1641 #!/usr/bin/perl
1642
1643 # checkout-and-build
1644
1645 use Fcntl ':flock';
1646 use File::Path qw( make_path );
1647
1648 my $destdir = $ENV{"CRUNCH_SRC"};
1649 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1650 my $repo = $ENV{"CRUNCH_SRC_URL"};
1651 my $task_work = $ENV{"TASK_WORK"};
1652
1653 for my $dir ($destdir, $task_work) {
1654     if ($dir) {
1655         make_path $dir;
1656         -e $dir or die "Failed to create temporary directory ($dir): $!";
1657     }
1658 }
1659
1660 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1661 flock L, LOCK_EX;
1662 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1663     if (@ARGV) {
1664         exec(@ARGV);
1665         die "Cannot exec `@ARGV`: $!";
1666     } else {
1667         exit 0;
1668     }
1669 }
1670
1671 unlink "$destdir.commit";
1672 open STDOUT, ">", "$destdir.log";
1673 open STDERR, ">&STDOUT";
1674
1675 mkdir $destdir;
1676 my @git_archive_data = <DATA>;
1677 if (@git_archive_data) {
1678   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1679   print TARX @git_archive_data;
1680   if(!close(TARX)) {
1681     die "'tar -C $destdir -xf -' exited $?: $!";
1682   }
1683 }
1684
1685 my $pwd;
1686 chomp ($pwd = `pwd`);
1687 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1688 mkdir $install_dir;
1689
1690 for my $src_path ("$destdir/arvados/sdk/python") {
1691   if (-d $src_path) {
1692     shell_or_die ("virtualenv", $install_dir);
1693     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1694   }
1695 }
1696
1697 if (-e "$destdir/crunch_scripts/install") {
1698     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1699 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1700     # Old version
1701     shell_or_die ("./tests/autotests.sh", $install_dir);
1702 } elsif (-e "./install.sh") {
1703     shell_or_die ("./install.sh", $install_dir);
1704 }
1705
1706 if ($commit) {
1707     unlink "$destdir.commit.new";
1708     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1709     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1710 }
1711
1712 close L;
1713
1714 if (@ARGV) {
1715     exec(@ARGV);
1716     die "Cannot exec `@ARGV`: $!";
1717 } else {
1718     exit 0;
1719 }
1720
1721 sub shell_or_die
1722 {
1723   if ($ENV{"DEBUG"}) {
1724     print STDERR "@_\n";
1725   }
1726   system (@_) == 0
1727       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1728 }
1729
1730 __DATA__