4aecb0e8123fd53a19e9f23f6d885c7b00d686b2
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Digest::MD5 qw(md5_hex);
90 use Getopt::Long;
91 use IPC::Open2;
92 use IO::Select;
93 use File::Temp;
94 use Fcntl ':flock';
95 use File::Path qw( make_path remove_tree );
96
97 use constant EX_TEMPFAIL => 75;
98
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102   if ($ENV{"USER"} ne "crunch" && $< != 0) {
103     # use a tmp dir unique for my uid
104     $ENV{"CRUNCH_TMP"} .= "-$<";
105   }
106 }
107
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
111 }
112
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
117
118 my $force_unlock;
119 my $git_dir;
120 my $jobspec;
121 my $job_api_token;
122 my $no_clear_tmp;
123 my $resume_stash;
124 GetOptions('force-unlock' => \$force_unlock,
125            'git-dir=s' => \$git_dir,
126            'job=s' => \$jobspec,
127            'job-api-token=s' => \$job_api_token,
128            'no-clear-tmp' => \$no_clear_tmp,
129            'resume-stash=s' => \$resume_stash,
130     );
131
132 if (defined $job_api_token) {
133   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
134 }
135
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
137 my $local_job = 0;
138
139
140 $SIG{'USR1'} = sub
141 {
142   $main::ENV{CRUNCH_DEBUG} = 1;
143 };
144 $SIG{'USR2'} = sub
145 {
146   $main::ENV{CRUNCH_DEBUG} = 0;
147 };
148
149
150
151 my $arv = Arvados->new('apiVersion' => 'v1');
152
153 my $User = $arv->{'users'}->{'current'}->execute;
154
155 my $Job;
156 my $job_id;
157 my $dbh;
158 my $sth;
159 if ($jobspec =~ /^[-a-z\d]+$/)
160 {
161   # $jobspec is an Arvados UUID, not a JSON job specification
162   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
163   if (!$force_unlock) {
164     # Claim this job, and make sure nobody else does
165     eval {
166       # lock() sets is_locked_by_uuid and changes state to Running.
167       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
168     };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
307 my $squeue_checked;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
311
312
313
314 if (defined $Job->{thawedfromkey})
315 {
316   thaw ($Job->{thawedfromkey});
317 }
318 else
319 {
320   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321     'job_uuid' => $Job->{'uuid'},
322     'sequence' => 0,
323     'qsequence' => 0,
324     'parameters' => {},
325                                                           });
326   push @jobstep, { 'level' => 0,
327                    'failures' => 0,
328                    'arvados_task' => $first_task,
329                  };
330   push @jobstep_todo, 0;
331 }
332
333
334 if (!$have_slurm)
335 {
336   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
337 }
338
339
340 my $build_script;
341 do {
342   local $/ = undef;
343   $build_script = <DATA>;
344 };
345 my $nodelist = join(",", @node);
346
347 if (!defined $no_clear_tmp) {
348   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
349   Log (undef, "Clean work dirs");
350
351   my $cleanpid = fork();
352   if ($cleanpid == 0)
353   {
354     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
355           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
356     exit (1);
357   }
358   while (1)
359   {
360     last if $cleanpid == waitpid (-1, WNOHANG);
361     freeze_if_want_freeze ($cleanpid);
362     select (undef, undef, undef, 0.1);
363   }
364   Log (undef, "Cleanup command exited ".exit_status_s($?));
365 }
366
367
368 my $git_archive;
369 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
370   # If we're in user-land (i.e., not called from crunch-dispatch)
371   # script_version can be an absolute directory path, signifying we
372   # should work straight out of that directory instead of using a git
373   # commit.
374   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
376 }
377 else {
378   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
379
380   # Install requested code version
381   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
382
383   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
384
385   # If we're running under crunch-dispatch, it will have already
386   # pulled the appropriate source tree into its own repository, and
387   # given us that repo's path as $git_dir.
388   #
389   # If we're running a "local" job, we might have to fetch content
390   # from a remote repository.
391   #
392   # (Currently crunch-dispatch gives a local path with --git-dir, but
393   # we might as well accept URLs there too in case it changes its
394   # mind.)
395   my $repo = $git_dir || $Job->{'repository'};
396
397   # Repository can be remote or local. If remote, we'll need to fetch it
398   # to a local dir before doing `git log` et al.
399   my $repo_location;
400
401   if ($repo =~ m{://|^[^/]*:}) {
402     # $repo is a git url we can clone, like git:// or https:// or
403     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
404     # not recognized here because distinguishing that from a local
405     # path is too fragile. If you really need something strange here,
406     # use the ssh:// form.
407     $repo_location = 'remote';
408   } elsif ($repo =~ m{^\.*/}) {
409     # $repo is a local path to a git index. We'll also resolve ../foo
410     # to ../foo/.git if the latter is a directory. To help
411     # disambiguate local paths from named hosted repositories, this
412     # form must be given as ./ or ../ if it's a relative path.
413     if (-d "$repo/.git") {
414       $repo = "$repo/.git";
415     }
416     $repo_location = 'local';
417   } else {
418     # $repo is none of the above. It must be the name of a hosted
419     # repository.
420     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
421       'filters' => [['name','=',$repo]]
422         );
423     my @repos_found = @{$arv_repo_list->{'items'}};
424     my $n_found = $arv_repo_list->{'items_available'};
425     if ($n_found > 0) {
426       Log(undef, "Repository '$repo' -> "
427           . join(", ", map { $_->{'uuid'} } @repos_found));
428     }
429     if ($n_found != 1) {
430       croak("Error: Found $n_found repositories with name '$repo'.");
431     }
432     $repo = $repos_found[0]->{'fetch_url'};
433     $repo_location = 'remote';
434   }
435   Log(undef, "Using $repo_location repository '$repo'");
436   $ENV{"CRUNCH_SRC_URL"} = $repo;
437
438   # Resolve given script_version (we'll call that $treeish here) to a
439   # commit sha1 ($commit).
440   my $treeish = $Job->{'script_version'};
441   my $commit;
442   if ($repo_location eq 'remote') {
443     # We minimize excess object-fetching by re-using the same bare
444     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
445     # just keep adding remotes to it as needed.
446     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
447     my $gitcmd = "git --git-dir=\Q$local_repo\E";
448
449     # Set up our local repo for caching remote objects, making
450     # archives, etc.
451     if (!-d $local_repo) {
452       make_path($local_repo) or croak("Error: could not create $local_repo");
453     }
454     # This works (exits 0 and doesn't delete fetched objects) even
455     # if $local_repo is already initialized:
456     `$gitcmd init --bare`;
457     if ($?) {
458       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
459     }
460
461     # If $treeish looks like a hash (or abbrev hash) we look it up in
462     # our local cache first, since that's cheaper. (We don't want to
463     # do that with tags/branches though -- those change over time, so
464     # they should always be resolved by the remote repo.)
465     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
466       # Hide stderr because it's normal for this to fail:
467       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
468       if ($? == 0 &&
469           # Careful not to resolve a branch named abcdeff to commit 1234567:
470           $sha1 =~ /^$treeish/ &&
471           $sha1 =~ /^([0-9a-f]{40})$/s) {
472         $commit = $1;
473         Log(undef, "Commit $commit already present in $local_repo");
474       }
475     }
476
477     if (!defined $commit) {
478       # If $treeish isn't just a hash or abbrev hash, or isn't here
479       # yet, we need to fetch the remote to resolve it correctly.
480
481       # First, remove all local heads. This prevents a name that does
482       # not exist on the remote from resolving to (or colliding with)
483       # a previously fetched branch or tag (possibly from a different
484       # remote).
485       remove_tree("$local_repo/refs/heads", {keep_root => 1});
486
487       Log(undef, "Fetching objects from $repo to $local_repo");
488       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
489       if ($?) {
490         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
491       }
492     }
493
494     # Now that the data is all here, we will use our local repo for
495     # the rest of our git activities.
496     $repo = $local_repo;
497   }
498
499   my $gitcmd = "git --git-dir=\Q$repo\E";
500   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
501   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
502     croak("`$gitcmd rev-list` exited "
503           .exit_status_s($?)
504           .", '$treeish' not found. Giving up.");
505   }
506   $commit = $1;
507   Log(undef, "Version $treeish is commit $commit");
508
509   if ($commit ne $Job->{'script_version'}) {
510     # Record the real commit id in the database, frozentokey, logs,
511     # etc. -- instead of an abbreviation or a branch name which can
512     # become ambiguous or point to a different commit in the future.
513     if (!$Job->update_attributes('script_version' => $commit)) {
514       croak("Error: failed to update job's script_version attribute");
515     }
516   }
517
518   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
519   $git_archive = `$gitcmd archive ''\Q$commit\E`;
520   if ($?) {
521     croak("Error: $gitcmd archive exited ".exit_status_s($?));
522   }
523 }
524
525 if (!defined $git_archive) {
526   Log(undef, "Skip install phase (no git archive)");
527   if ($have_slurm) {
528     Log(undef, "Warning: This probably means workers have no source tree!");
529   }
530 }
531 else {
532   Log(undef, "Run install script on all workers");
533
534   my @srunargs = ("srun",
535                   "--nodelist=$nodelist",
536                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
537   my @execargs = ("sh", "-c",
538                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
539
540   # Note: this section is almost certainly unnecessary if we're
541   # running tasks in docker containers.
542   my $installpid = fork();
543   if ($installpid == 0)
544   {
545     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
546     exit (1);
547   }
548   while (1)
549   {
550     last if $installpid == waitpid (-1, WNOHANG);
551     freeze_if_want_freeze ($installpid);
552     select (undef, undef, undef, 0.1);
553   }
554   Log (undef, "Install script exited ".exit_status_s($?));
555 }
556
557 if (!$have_slurm)
558 {
559   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
560   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
561 }
562
563 # If this job requires a Docker image, install that.
564 my $docker_bin = "/usr/bin/docker.io";
565 my ($docker_locator, $docker_stream, $docker_hash);
566 if ($docker_locator = $Job->{docker_image_locator}) {
567   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
568   if (!$docker_hash)
569   {
570     croak("No Docker image hash found from locator $docker_locator");
571   }
572   $docker_stream =~ s/^\.//;
573   my $docker_install_script = qq{
574 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
575     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
576 fi
577 };
578   my $docker_pid = fork();
579   if ($docker_pid == 0)
580   {
581     srun (["srun", "--nodelist=" . join(',', @node)],
582           ["/bin/sh", "-ec", $docker_install_script]);
583     exit ($?);
584   }
585   while (1)
586   {
587     last if $docker_pid == waitpid (-1, WNOHANG);
588     freeze_if_want_freeze ($docker_pid);
589     select (undef, undef, undef, 0.1);
590   }
591   if ($? != 0)
592   {
593     croak("Installing Docker image from $docker_locator exited "
594           .exit_status_s($?));
595   }
596 }
597
598 foreach (qw (script script_version script_parameters runtime_constraints))
599 {
600   Log (undef,
601        "$_ " .
602        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
603 }
604 foreach (split (/\n/, $Job->{knobs}))
605 {
606   Log (undef, "knob " . $_);
607 }
608
609
610
611 $main::success = undef;
612
613
614
615 ONELEVEL:
616
617 my $thisround_succeeded = 0;
618 my $thisround_failed = 0;
619 my $thisround_failed_multiple = 0;
620
621 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
622                        or $a <=> $b } @jobstep_todo;
623 my $level = $jobstep[$jobstep_todo[0]]->{level};
624 Log (undef, "start level $level");
625
626
627
628 my %proc;
629 my @freeslot = (0..$#slot);
630 my @holdslot;
631 my %reader;
632 my $progress_is_dirty = 1;
633 my $progress_stats_updated = 0;
634
635 update_progress_stats();
636
637
638
639 THISROUND:
640 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
641 {
642   my $id = $jobstep_todo[$todo_ptr];
643   my $Jobstep = $jobstep[$id];
644   if ($Jobstep->{level} != $level)
645   {
646     next;
647   }
648
649   pipe $reader{$id}, "writer" or croak ($!);
650   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
651   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
652
653   my $childslot = $freeslot[0];
654   my $childnode = $slot[$childslot]->{node};
655   my $childslotname = join (".",
656                             $slot[$childslot]->{node}->{name},
657                             $slot[$childslot]->{cpu});
658   my $childpid = fork();
659   if ($childpid == 0)
660   {
661     $SIG{'INT'} = 'DEFAULT';
662     $SIG{'QUIT'} = 'DEFAULT';
663     $SIG{'TERM'} = 'DEFAULT';
664
665     foreach (values (%reader))
666     {
667       close($_);
668     }
669     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
670     open(STDOUT,">&writer");
671     open(STDERR,">&writer");
672
673     undef $dbh;
674     undef $sth;
675
676     delete $ENV{"GNUPGHOME"};
677     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
678     $ENV{"TASK_QSEQUENCE"} = $id;
679     $ENV{"TASK_SEQUENCE"} = $level;
680     $ENV{"JOB_SCRIPT"} = $Job->{script};
681     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
682       $param =~ tr/a-z/A-Z/;
683       $ENV{"JOB_PARAMETER_$param"} = $value;
684     }
685     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
686     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
687     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
688     $ENV{"HOME"} = $ENV{"TASK_WORK"};
689     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
690     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
691     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
692     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
693
694     $ENV{"GZIP"} = "-n";
695
696     my @srunargs = (
697       "srun",
698       "--nodelist=".$childnode->{name},
699       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
700       "--job-name=$job_id.$id.$$",
701         );
702     my $build_script_to_send = "";
703     my $command =
704         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
705         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
706         ."&& cd $ENV{CRUNCH_TMP} ";
707     if ($build_script)
708     {
709       $build_script_to_send = $build_script;
710       $command .=
711           "&& perl -";
712     }
713     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
714     if ($docker_hash)
715     {
716       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
717       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
718       # Dynamically configure the container to use the host system as its
719       # DNS server.  Get the host's global addresses from the ip command,
720       # and turn them into docker --dns options using gawk.
721       $command .=
722           q{$(ip -o address show scope global |
723               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
724       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
725       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
726       $command .= "--env=\QHOME=/home/crunch\E ";
727       while (my ($env_key, $env_val) = each %ENV)
728       {
729         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
730           if ($env_key eq "TASK_WORK") {
731             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
732           }
733           elsif ($env_key eq "TASK_KEEPMOUNT") {
734             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
735           }
736           else {
737             $command .= "--env=\Q$env_key=$env_val\E ";
738           }
739         }
740       }
741       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
742       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
743       $command .= "\Q$docker_hash\E ";
744       $command .= "stdbuf --output=0 --error=0 ";
745       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
746     } else {
747       # Non-docker run
748       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
749       $command .= "stdbuf --output=0 --error=0 ";
750       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
751     }
752
753     my @execargs = ('bash', '-c', $command);
754     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
755     # exec() failed, we assume nothing happened.
756     Log(undef, "srun() failed on build script");
757     die;
758   }
759   close("writer");
760   if (!defined $childpid)
761   {
762     close $reader{$id};
763     delete $reader{$id};
764     next;
765   }
766   shift @freeslot;
767   $proc{$childpid} = { jobstep => $id,
768                        time => time,
769                        slot => $childslot,
770                        jobstepname => "$job_id.$id.$childpid",
771                      };
772   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
773   $slot[$childslot]->{pid} = $childpid;
774
775   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
776   Log ($id, "child $childpid started on $childslotname");
777   $Jobstep->{starttime} = time;
778   $Jobstep->{node} = $childnode->{name};
779   $Jobstep->{slotindex} = $childslot;
780   delete $Jobstep->{stderr};
781   delete $Jobstep->{finishtime};
782
783   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
784   $Jobstep->{'arvados_task'}->save;
785
786   splice @jobstep_todo, $todo_ptr, 1;
787   --$todo_ptr;
788
789   $progress_is_dirty = 1;
790
791   while (!@freeslot
792          ||
793          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
794   {
795     last THISROUND if $main::please_freeze;
796     if ($main::please_info)
797     {
798       $main::please_info = 0;
799       freeze();
800       collate_output();
801       save_meta(1);
802       update_progress_stats();
803     }
804     my $gotsome
805         = readfrompipes ()
806         + reapchildren ();
807     if (!$gotsome)
808     {
809       check_refresh_wanted();
810       check_squeue();
811       update_progress_stats();
812       select (undef, undef, undef, 0.1);
813     }
814     elsif (time - $progress_stats_updated >= 30)
815     {
816       update_progress_stats();
817     }
818     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
819         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
820     {
821       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
822           .($thisround_failed+$thisround_succeeded)
823           .") -- giving up on this round";
824       Log (undef, $message);
825       last THISROUND;
826     }
827
828     # move slots from freeslot to holdslot (or back to freeslot) if necessary
829     for (my $i=$#freeslot; $i>=0; $i--) {
830       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
831         push @holdslot, (splice @freeslot, $i, 1);
832       }
833     }
834     for (my $i=$#holdslot; $i>=0; $i--) {
835       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
836         push @freeslot, (splice @holdslot, $i, 1);
837       }
838     }
839
840     # give up if no nodes are succeeding
841     if (!grep { $_->{node}->{losing_streak} == 0 &&
842                     $_->{node}->{hold_count} < 4 } @slot) {
843       my $message = "Every node has failed -- giving up on this round";
844       Log (undef, $message);
845       last THISROUND;
846     }
847   }
848 }
849
850
851 push @freeslot, splice @holdslot;
852 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
853
854
855 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
856 while (%proc)
857 {
858   if ($main::please_continue) {
859     $main::please_continue = 0;
860     goto THISROUND;
861   }
862   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
863   readfrompipes ();
864   if (!reapchildren())
865   {
866     check_refresh_wanted();
867     check_squeue();
868     update_progress_stats();
869     select (undef, undef, undef, 0.1);
870     killem (keys %proc) if $main::please_freeze;
871   }
872 }
873
874 update_progress_stats();
875 freeze_if_want_freeze();
876
877
878 if (!defined $main::success)
879 {
880   if (@jobstep_todo &&
881       $thisround_succeeded == 0 &&
882       ($thisround_failed == 0 || $thisround_failed > 4))
883   {
884     my $message = "stop because $thisround_failed tasks failed and none succeeded";
885     Log (undef, $message);
886     $main::success = 0;
887   }
888   if (!@jobstep_todo)
889   {
890     $main::success = 1;
891   }
892 }
893
894 goto ONELEVEL if !defined $main::success;
895
896
897 release_allocation();
898 freeze();
899 my $collated_output = &collate_output();
900
901 if (!$collated_output) {
902   Log(undef, "output undef");
903 }
904 else {
905   eval {
906     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
907         or die "failed to get collated manifest: $!";
908     my $orig_manifest_text = '';
909     while (my $manifest_line = <$orig_manifest>) {
910       $orig_manifest_text .= $manifest_line;
911     }
912     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
913       'manifest_text' => $orig_manifest_text,
914     });
915     Log(undef, "output uuid " . $output->{uuid});
916     Log(undef, "output hash " . $output->{portable_data_hash});
917     $Job->update_attributes('output' => $output->{portable_data_hash});
918   };
919   if ($@) {
920     Log (undef, "Failed to register output manifest: $@");
921   }
922 }
923
924 Log (undef, "finish");
925
926 save_meta();
927
928 my $final_state;
929 if ($collated_output && $main::success) {
930   $final_state = 'Complete';
931 } else {
932   $final_state = 'Failed';
933 }
934 $Job->update_attributes('state' => $final_state);
935
936 exit (($final_state eq 'Complete') ? 0 : 1);
937
938
939
940 sub update_progress_stats
941 {
942   $progress_stats_updated = time;
943   return if !$progress_is_dirty;
944   my ($todo, $done, $running) = (scalar @jobstep_todo,
945                                  scalar @jobstep_done,
946                                  scalar @slot - scalar @freeslot - scalar @holdslot);
947   $Job->{'tasks_summary'} ||= {};
948   $Job->{'tasks_summary'}->{'todo'} = $todo;
949   $Job->{'tasks_summary'}->{'done'} = $done;
950   $Job->{'tasks_summary'}->{'running'} = $running;
951   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
952   Log (undef, "status: $done done, $running running, $todo todo");
953   $progress_is_dirty = 0;
954 }
955
956
957
958 sub reapchildren
959 {
960   my $pid = waitpid (-1, WNOHANG);
961   return 0 if $pid <= 0;
962
963   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
964                   . "."
965                   . $slot[$proc{$pid}->{slot}]->{cpu});
966   my $jobstepid = $proc{$pid}->{jobstep};
967   my $elapsed = time - $proc{$pid}->{time};
968   my $Jobstep = $jobstep[$jobstepid];
969
970   my $childstatus = $?;
971   my $exitvalue = $childstatus >> 8;
972   my $exitinfo = "exit ".exit_status_s($childstatus);
973   $Jobstep->{'arvados_task'}->reload;
974   my $task_success = $Jobstep->{'arvados_task'}->{success};
975
976   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
977
978   if (!defined $task_success) {
979     # task did not indicate one way or the other --> fail
980     $Jobstep->{'arvados_task'}->{success} = 0;
981     $Jobstep->{'arvados_task'}->save;
982     $task_success = 0;
983   }
984
985   if (!$task_success)
986   {
987     my $temporary_fail;
988     $temporary_fail ||= $Jobstep->{node_fail};
989     $temporary_fail ||= ($exitvalue == 111);
990
991     ++$thisround_failed;
992     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
993
994     # Check for signs of a failed or misconfigured node
995     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
996         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
997       # Don't count this against jobstep failure thresholds if this
998       # node is already suspected faulty and srun exited quickly
999       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1000           $elapsed < 5) {
1001         Log ($jobstepid, "blaming failure on suspect node " .
1002              $slot[$proc{$pid}->{slot}]->{node}->{name});
1003         $temporary_fail ||= 1;
1004       }
1005       ban_node_by_slot($proc{$pid}->{slot});
1006     }
1007
1008     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1009                              ++$Jobstep->{'failures'},
1010                              $temporary_fail ? 'temporary ' : 'permanent',
1011                              $elapsed));
1012
1013     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1014       # Give up on this task, and the whole job
1015       $main::success = 0;
1016       $main::please_freeze = 1;
1017     }
1018     # Put this task back on the todo queue
1019     push @jobstep_todo, $jobstepid;
1020     $Job->{'tasks_summary'}->{'failed'}++;
1021   }
1022   else
1023   {
1024     ++$thisround_succeeded;
1025     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1026     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1027     push @jobstep_done, $jobstepid;
1028     Log ($jobstepid, "success in $elapsed seconds");
1029   }
1030   $Jobstep->{exitcode} = $childstatus;
1031   $Jobstep->{finishtime} = time;
1032   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1033   $Jobstep->{'arvados_task'}->save;
1034   process_stderr ($jobstepid, $task_success);
1035   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1036
1037   close $reader{$jobstepid};
1038   delete $reader{$jobstepid};
1039   delete $slot[$proc{$pid}->{slot}]->{pid};
1040   push @freeslot, $proc{$pid}->{slot};
1041   delete $proc{$pid};
1042
1043   if ($task_success) {
1044     # Load new tasks
1045     my $newtask_list = [];
1046     my $newtask_results;
1047     do {
1048       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1049         'where' => {
1050           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1051         },
1052         'order' => 'qsequence',
1053         'offset' => scalar(@$newtask_list),
1054       );
1055       push(@$newtask_list, @{$newtask_results->{items}});
1056     } while (@{$newtask_results->{items}});
1057     foreach my $arvados_task (@$newtask_list) {
1058       my $jobstep = {
1059         'level' => $arvados_task->{'sequence'},
1060         'failures' => 0,
1061         'arvados_task' => $arvados_task
1062       };
1063       push @jobstep, $jobstep;
1064       push @jobstep_todo, $#jobstep;
1065     }
1066   }
1067
1068   $progress_is_dirty = 1;
1069   1;
1070 }
1071
1072 sub check_refresh_wanted
1073 {
1074   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1075   if (@stat && $stat[9] > $latest_refresh) {
1076     $latest_refresh = scalar time;
1077     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1078     for my $attr ('cancelled_at',
1079                   'cancelled_by_user_uuid',
1080                   'cancelled_by_client_uuid',
1081                   'state') {
1082       $Job->{$attr} = $Job2->{$attr};
1083     }
1084     if ($Job->{'state'} ne "Running") {
1085       if ($Job->{'state'} eq "Cancelled") {
1086         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1087       } else {
1088         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1089       }
1090       $main::success = 0;
1091       $main::please_freeze = 1;
1092     }
1093   }
1094 }
1095
1096 sub check_squeue
1097 {
1098   # return if the kill list was checked <4 seconds ago
1099   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1100   {
1101     return;
1102   }
1103   $squeue_kill_checked = time;
1104
1105   # use killem() on procs whose killtime is reached
1106   for (keys %proc)
1107   {
1108     if (exists $proc{$_}->{killtime}
1109         && $proc{$_}->{killtime} <= time)
1110     {
1111       killem ($_);
1112     }
1113   }
1114
1115   # return if the squeue was checked <60 seconds ago
1116   if (defined $squeue_checked && $squeue_checked > time - 60)
1117   {
1118     return;
1119   }
1120   $squeue_checked = time;
1121
1122   if (!$have_slurm)
1123   {
1124     # here is an opportunity to check for mysterious problems with local procs
1125     return;
1126   }
1127
1128   # get a list of steps still running
1129   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1130   chop @squeue;
1131   if ($squeue[-1] ne "ok")
1132   {
1133     return;
1134   }
1135   pop @squeue;
1136
1137   # which of my jobsteps are running, according to squeue?
1138   my %ok;
1139   foreach (@squeue)
1140   {
1141     if (/^(\d+)\.(\d+) (\S+)/)
1142     {
1143       if ($1 eq $ENV{SLURM_JOBID})
1144       {
1145         $ok{$3} = 1;
1146       }
1147     }
1148   }
1149
1150   # which of my active child procs (>60s old) were not mentioned by squeue?
1151   foreach (keys %proc)
1152   {
1153     if ($proc{$_}->{time} < time - 60
1154         && !exists $ok{$proc{$_}->{jobstepname}}
1155         && !exists $proc{$_}->{killtime})
1156     {
1157       # kill this proc if it hasn't exited in 30 seconds
1158       $proc{$_}->{killtime} = time + 30;
1159     }
1160   }
1161 }
1162
1163
1164 sub release_allocation
1165 {
1166   if ($have_slurm)
1167   {
1168     Log (undef, "release job allocation");
1169     system "scancel $ENV{SLURM_JOBID}";
1170   }
1171 }
1172
1173
1174 sub readfrompipes
1175 {
1176   my $gotsome = 0;
1177   foreach my $job (keys %reader)
1178   {
1179     my $buf;
1180     while (0 < sysread ($reader{$job}, $buf, 8192))
1181     {
1182       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1183       $jobstep[$job]->{stderr} .= $buf;
1184       preprocess_stderr ($job);
1185       if (length ($jobstep[$job]->{stderr}) > 16384)
1186       {
1187         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1188       }
1189       $gotsome = 1;
1190     }
1191   }
1192   return $gotsome;
1193 }
1194
1195
1196 sub preprocess_stderr
1197 {
1198   my $job = shift;
1199
1200   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1201     my $line = $1;
1202     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1203     Log ($job, "stderr $line");
1204     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1205       # whoa.
1206       $main::please_freeze = 1;
1207     }
1208     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1209       $jobstep[$job]->{node_fail} = 1;
1210       ban_node_by_slot($jobstep[$job]->{slotindex});
1211     }
1212   }
1213 }
1214
1215
1216 sub process_stderr
1217 {
1218   my $job = shift;
1219   my $task_success = shift;
1220   preprocess_stderr ($job);
1221
1222   map {
1223     Log ($job, "stderr $_");
1224   } split ("\n", $jobstep[$job]->{stderr});
1225 }
1226
1227 sub fetch_block
1228 {
1229   my $hash = shift;
1230   my ($keep, $child_out, $output_block);
1231
1232   my $cmd = "arv-get \Q$hash\E";
1233   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1234   $output_block = '';
1235   while (1) {
1236     my $buf;
1237     my $bytes = sysread($keep, $buf, 1024 * 1024);
1238     if (!defined $bytes) {
1239       die "reading from arv-get: $!";
1240     } elsif ($bytes == 0) {
1241       # sysread returns 0 at the end of the pipe.
1242       last;
1243     } else {
1244       # some bytes were read into buf.
1245       $output_block .= $buf;
1246     }
1247   }
1248   close $keep;
1249   return $output_block;
1250 }
1251
1252 sub collate_output
1253 {
1254   Log (undef, "collate");
1255
1256   my ($child_out, $child_in);
1257   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1258                   '--retries', put_retry_count());
1259   my $joboutput;
1260   for (@jobstep)
1261   {
1262     next if (!exists $_->{'arvados_task'}->{'output'} ||
1263              !$_->{'arvados_task'}->{'success'});
1264     my $output = $_->{'arvados_task'}->{output};
1265     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1266     {
1267       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1268       print $child_in $output;
1269     }
1270     elsif (@jobstep == 1)
1271     {
1272       $joboutput = $output;
1273       last;
1274     }
1275     elsif (defined (my $outblock = fetch_block ($output)))
1276     {
1277       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1278       print $child_in $outblock;
1279     }
1280     else
1281     {
1282       Log (undef, "XXX fetch_block($output) failed XXX");
1283       $main::success = 0;
1284     }
1285   }
1286   $child_in->close;
1287
1288   if (!defined $joboutput) {
1289     my $s = IO::Select->new($child_out);
1290     if ($s->can_read(120)) {
1291       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1292       chomp($joboutput);
1293       # TODO: Ensure exit status == 0.
1294     } else {
1295       Log (undef, "timed out reading from 'arv-put'");
1296     }
1297   }
1298   # TODO: kill $pid instead of waiting, now that we've decided to
1299   # ignore further output.
1300   waitpid($pid, 0);
1301
1302   return $joboutput;
1303 }
1304
1305
1306 sub killem
1307 {
1308   foreach (@_)
1309   {
1310     my $sig = 2;                # SIGINT first
1311     if (exists $proc{$_}->{"sent_$sig"} &&
1312         time - $proc{$_}->{"sent_$sig"} > 4)
1313     {
1314       $sig = 15;                # SIGTERM if SIGINT doesn't work
1315     }
1316     if (exists $proc{$_}->{"sent_$sig"} &&
1317         time - $proc{$_}->{"sent_$sig"} > 4)
1318     {
1319       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1320     }
1321     if (!exists $proc{$_}->{"sent_$sig"})
1322     {
1323       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1324       kill $sig, $_;
1325       select (undef, undef, undef, 0.1);
1326       if ($sig == 2)
1327       {
1328         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1329       }
1330       $proc{$_}->{"sent_$sig"} = time;
1331       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1332     }
1333   }
1334 }
1335
1336
1337 sub fhbits
1338 {
1339   my($bits);
1340   for (@_) {
1341     vec($bits,fileno($_),1) = 1;
1342   }
1343   $bits;
1344 }
1345
1346
1347 # Send log output to Keep via arv-put.
1348 #
1349 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1350 # $log_pipe_pid is the pid of the arv-put subprocess.
1351 #
1352 # The only functions that should access these variables directly are:
1353 #
1354 # log_writer_start($logfilename)
1355 #     Starts an arv-put pipe, reading data on stdin and writing it to
1356 #     a $logfilename file in an output collection.
1357 #
1358 # log_writer_send($txt)
1359 #     Writes $txt to the output log collection.
1360 #
1361 # log_writer_finish()
1362 #     Closes the arv-put pipe and returns the output that it produces.
1363 #
1364 # log_writer_is_active()
1365 #     Returns a true value if there is currently a live arv-put
1366 #     process, false otherwise.
1367 #
1368 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1369
1370 sub log_writer_start($)
1371 {
1372   my $logfilename = shift;
1373   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1374                         'arv-put', '--portable-data-hash',
1375                         '--retries', '3',
1376                         '--filename', $logfilename,
1377                         '-');
1378 }
1379
1380 sub log_writer_send($)
1381 {
1382   my $txt = shift;
1383   print $log_pipe_in $txt;
1384 }
1385
1386 sub log_writer_finish()
1387 {
1388   return unless $log_pipe_pid;
1389
1390   close($log_pipe_in);
1391   my $arv_put_output;
1392
1393   my $s = IO::Select->new($log_pipe_out);
1394   if ($s->can_read(120)) {
1395     sysread($log_pipe_out, $arv_put_output, 1024);
1396     chomp($arv_put_output);
1397   } else {
1398     Log (undef, "timed out reading from 'arv-put'");
1399   }
1400
1401   waitpid($log_pipe_pid, 0);
1402   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1403   if ($?) {
1404     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1405   }
1406
1407   return $arv_put_output;
1408 }
1409
1410 sub log_writer_is_active() {
1411   return $log_pipe_pid;
1412 }
1413
1414 sub Log                         # ($jobstep_id, $logmessage)
1415 {
1416   if ($_[1] =~ /\n/) {
1417     for my $line (split (/\n/, $_[1])) {
1418       Log ($_[0], $line);
1419     }
1420     return;
1421   }
1422   my $fh = select STDERR; $|=1; select $fh;
1423   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1424   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1425   $message .= "\n";
1426   my $datetime;
1427   if (log_writer_is_active() || -t STDERR) {
1428     my @gmtime = gmtime;
1429     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1430                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1431   }
1432   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1433
1434   if (log_writer_is_active()) {
1435     log_writer_send($datetime . " " . $message);
1436   }
1437 }
1438
1439
1440 sub croak
1441 {
1442   my ($package, $file, $line) = caller;
1443   my $message = "@_ at $file line $line\n";
1444   Log (undef, $message);
1445   freeze() if @jobstep_todo;
1446   collate_output() if @jobstep_todo;
1447   cleanup();
1448   save_meta();
1449   die;
1450 }
1451
1452
1453 sub cleanup
1454 {
1455   return unless $Job;
1456   if ($Job->{'state'} eq 'Cancelled') {
1457     $Job->update_attributes('finished_at' => scalar gmtime);
1458   } else {
1459     $Job->update_attributes('state' => 'Failed');
1460   }
1461 }
1462
1463
1464 sub save_meta
1465 {
1466   my $justcheckpoint = shift; # false if this will be the last meta saved
1467   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1468   return unless log_writer_is_active();
1469
1470   my $loglocator = log_writer_finish();
1471   Log (undef, "log manifest is $loglocator");
1472   $Job->{'log'} = $loglocator;
1473   $Job->update_attributes('log', $loglocator);
1474 }
1475
1476
1477 sub freeze_if_want_freeze
1478 {
1479   if ($main::please_freeze)
1480   {
1481     release_allocation();
1482     if (@_)
1483     {
1484       # kill some srun procs before freeze+stop
1485       map { $proc{$_} = {} } @_;
1486       while (%proc)
1487       {
1488         killem (keys %proc);
1489         select (undef, undef, undef, 0.1);
1490         my $died;
1491         while (($died = waitpid (-1, WNOHANG)) > 0)
1492         {
1493           delete $proc{$died};
1494         }
1495       }
1496     }
1497     freeze();
1498     collate_output();
1499     cleanup();
1500     save_meta();
1501     exit 1;
1502   }
1503 }
1504
1505
1506 sub freeze
1507 {
1508   Log (undef, "Freeze not implemented");
1509   return;
1510 }
1511
1512
1513 sub thaw
1514 {
1515   croak ("Thaw not implemented");
1516 }
1517
1518
1519 sub freezequote
1520 {
1521   my $s = shift;
1522   $s =~ s/\\/\\\\/g;
1523   $s =~ s/\n/\\n/g;
1524   return $s;
1525 }
1526
1527
1528 sub freezeunquote
1529 {
1530   my $s = shift;
1531   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1532   return $s;
1533 }
1534
1535
1536 sub srun
1537 {
1538   my $srunargs = shift;
1539   my $execargs = shift;
1540   my $opts = shift || {};
1541   my $stdin = shift;
1542   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1543   print STDERR (join (" ",
1544                       map { / / ? "'$_'" : $_ }
1545                       (@$args)),
1546                 "\n")
1547       if $ENV{CRUNCH_DEBUG};
1548
1549   if (defined $stdin) {
1550     my $child = open STDIN, "-|";
1551     defined $child or die "no fork: $!";
1552     if ($child == 0) {
1553       print $stdin or die $!;
1554       close STDOUT or die $!;
1555       exit 0;
1556     }
1557   }
1558
1559   return system (@$args) if $opts->{fork};
1560
1561   exec @$args;
1562   warn "ENV size is ".length(join(" ",%ENV));
1563   die "exec failed: $!: @$args";
1564 }
1565
1566
1567 sub ban_node_by_slot {
1568   # Don't start any new jobsteps on this node for 60 seconds
1569   my $slotid = shift;
1570   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1571   $slot[$slotid]->{node}->{hold_count}++;
1572   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1573 }
1574
1575 sub must_lock_now
1576 {
1577   my ($lockfile, $error_message) = @_;
1578   open L, ">", $lockfile or croak("$lockfile: $!");
1579   if (!flock L, LOCK_EX|LOCK_NB) {
1580     croak("Can't lock $lockfile: $error_message\n");
1581   }
1582 }
1583
1584 sub find_docker_image {
1585   # Given a Keep locator, check to see if it contains a Docker image.
1586   # If so, return its stream name and Docker hash.
1587   # If not, return undef for both values.
1588   my $locator = shift;
1589   my ($streamname, $filename);
1590   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1591     foreach my $line (split(/\n/, $image->{manifest_text})) {
1592       my @tokens = split(/\s+/, $line);
1593       next if (!@tokens);
1594       $streamname = shift(@tokens);
1595       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1596         if (defined($filename)) {
1597           return (undef, undef);  # More than one file in the Collection.
1598         } else {
1599           $filename = (split(/:/, $filedata, 3))[2];
1600         }
1601       }
1602     }
1603   }
1604   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1605     return ($streamname, $1);
1606   } else {
1607     return (undef, undef);
1608   }
1609 }
1610
1611 sub put_retry_count {
1612   # Calculate a --retries argument for arv-put that will have it try
1613   # approximately as long as this Job has been running.
1614   my $stoptime = shift || time;
1615   my $starttime = $jobstep[0]->{starttime};
1616   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1617   my $retries = 0;
1618   while ($timediff >= 2) {
1619     $retries++;
1620     $timediff /= 2;
1621   }
1622   return ($retries > 3) ? $retries : 3;
1623 }
1624
1625 sub exit_status_s {
1626   # Given a $?, return a human-readable exit code string like "0" or
1627   # "1" or "0 with signal 1" or "1 with signal 11".
1628   my $exitcode = shift;
1629   my $s = $exitcode >> 8;
1630   if ($exitcode & 0x7f) {
1631     $s .= " with signal " . ($exitcode & 0x7f);
1632   }
1633   if ($exitcode & 0x80) {
1634     $s .= " with core dump";
1635   }
1636   return $s;
1637 }
1638
1639 __DATA__
1640 #!/usr/bin/perl
1641
1642 # checkout-and-build
1643
1644 use Fcntl ':flock';
1645 use File::Path qw( make_path );
1646
1647 my $destdir = $ENV{"CRUNCH_SRC"};
1648 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1649 my $repo = $ENV{"CRUNCH_SRC_URL"};
1650 my $task_work = $ENV{"TASK_WORK"};
1651
1652 for my $dir ($destdir, $task_work) {
1653     if ($dir) {
1654         make_path $dir;
1655         -e $dir or die "Failed to create temporary directory ($dir): $!";
1656     }
1657 }
1658
1659 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1660 flock L, LOCK_EX;
1661 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1662     if (@ARGV) {
1663         exec(@ARGV);
1664         die "Cannot exec `@ARGV`: $!";
1665     } else {
1666         exit 0;
1667     }
1668 }
1669
1670 unlink "$destdir.commit";
1671 open STDOUT, ">", "$destdir.log";
1672 open STDERR, ">&STDOUT";
1673
1674 mkdir $destdir;
1675 my @git_archive_data = <DATA>;
1676 if (@git_archive_data) {
1677   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1678   print TARX @git_archive_data;
1679   if(!close(TARX)) {
1680     die "'tar -C $destdir -xf -' exited $?: $!";
1681   }
1682 }
1683
1684 my $pwd;
1685 chomp ($pwd = `pwd`);
1686 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1687 mkdir $install_dir;
1688
1689 for my $src_path ("$destdir/arvados/sdk/python") {
1690   if (-d $src_path) {
1691     shell_or_die ("virtualenv", $install_dir);
1692     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1693   }
1694 }
1695
1696 if (-e "$destdir/crunch_scripts/install") {
1697     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1698 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1699     # Old version
1700     shell_or_die ("./tests/autotests.sh", $install_dir);
1701 } elsif (-e "./install.sh") {
1702     shell_or_die ("./install.sh", $install_dir);
1703 }
1704
1705 if ($commit) {
1706     unlink "$destdir.commit.new";
1707     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1708     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1709 }
1710
1711 close L;
1712
1713 if (@ARGV) {
1714     exec(@ARGV);
1715     die "Cannot exec `@ARGV`: $!";
1716 } else {
1717     exit 0;
1718 }
1719
1720 sub shell_or_die
1721 {
1722   if ($ENV{"DEBUG"}) {
1723     print STDERR "@_\n";
1724   }
1725   system (@_) == 0
1726       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1727 }
1728
1729 __DATA__