3775: Update comment
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Digest::MD5 qw(md5_hex);
90 use Getopt::Long;
91 use IPC::Open2;
92 use IO::Select;
93 use File::Temp;
94 use Fcntl ':flock';
95 use File::Path qw( make_path remove_tree );
96
97 use constant EX_TEMPFAIL => 75;
98
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102   if ($ENV{"USER"} ne "crunch" && $< != 0) {
103     # use a tmp dir unique for my uid
104     $ENV{"CRUNCH_TMP"} .= "-$<";
105   }
106 }
107
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
111 }
112
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
117
118 my $force_unlock;
119 my $git_dir;
120 my $jobspec;
121 my $job_api_token;
122 my $no_clear_tmp;
123 my $resume_stash;
124 GetOptions('force-unlock' => \$force_unlock,
125            'git-dir=s' => \$git_dir,
126            'job=s' => \$jobspec,
127            'job-api-token=s' => \$job_api_token,
128            'no-clear-tmp' => \$no_clear_tmp,
129            'resume-stash=s' => \$resume_stash,
130     );
131
132 if (defined $job_api_token) {
133   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
134 }
135
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
137 my $local_job = 0;
138
139
140 $SIG{'USR1'} = sub
141 {
142   $main::ENV{CRUNCH_DEBUG} = 1;
143 };
144 $SIG{'USR2'} = sub
145 {
146   $main::ENV{CRUNCH_DEBUG} = 0;
147 };
148
149
150
151 my $arv = Arvados->new('apiVersion' => 'v1');
152
153 my $User = $arv->{'users'}->{'current'}->execute;
154
155 my $Job;
156 my $job_id;
157 my $dbh;
158 my $sth;
159 if ($jobspec =~ /^[-a-z\d]+$/)
160 {
161   # $jobspec is an Arvados UUID, not a JSON job specification
162   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
163   if (!$force_unlock) {
164     # Claim this job, and make sure nobody else does
165     eval {
166       # lock() sets is_locked_by_uuid and changes state to Running.
167       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
168     };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep;
303 my @jobstep_todo = ();
304 my @jobstep_done = ();
305 my @jobstep_tomerge = ();
306 my $jobstep_tomerge_level = 0;
307 my $squeue_checked;
308 my $squeue_kill_checked;
309 my $output_in_keep = 0;
310 my $latest_refresh = scalar time;
311
312
313
314 if (defined $Job->{thawedfromkey})
315 {
316   thaw ($Job->{thawedfromkey});
317 }
318 else
319 {
320   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
321     'job_uuid' => $Job->{'uuid'},
322     'sequence' => 0,
323     'qsequence' => 0,
324     'parameters' => {},
325                                                           });
326   push @jobstep, { 'level' => 0,
327                    'failures' => 0,
328                    'arvados_task' => $first_task,
329                  };
330   push @jobstep_todo, 0;
331 }
332
333
334 if (!$have_slurm)
335 {
336   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
337 }
338
339
340 my $build_script;
341 do {
342   local $/ = undef;
343   $build_script = <DATA>;
344 };
345 my $nodelist = join(",", @node);
346
347 if (!defined $no_clear_tmp) {
348   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
349   Log (undef, "Clean work dirs");
350
351   my $cleanpid = fork();
352   if ($cleanpid == 0)
353   {
354     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
355           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
356     exit (1);
357   }
358   while (1)
359   {
360     last if $cleanpid == waitpid (-1, WNOHANG);
361     freeze_if_want_freeze ($cleanpid);
362     select (undef, undef, undef, 0.1);
363   }
364   Log (undef, "Cleanup command exited ".exit_status_s($?));
365 }
366
367
368 my $git_archive;
369 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
370   # If script_version looks like an absolute path, *and* the --git-dir
371   # argument was not given -- which implies we were not invoked by
372   # crunch-dispatch -- we will use the given path as a working
373   # directory instead of resolving script_version to a git commit (or
374   # doing anything else with git).
375   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
376   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
377 }
378 else {
379   # Resolve the given script_version to a git commit sha1. Also, if
380   # the repository is remote, clone it into our local filesystem: this
381   # ensures "git archive" will work, and is necessary to reliably
382   # resolve a symbolic script_version like "master^".
383   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
384
385   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
386
387   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
388
389   # If we're running under crunch-dispatch, it will have already
390   # pulled the appropriate source tree into its own repository, and
391   # given us that repo's path as $git_dir.
392   #
393   # If we're running a "local" job, we might have to fetch content
394   # from a remote repository.
395   #
396   # (Currently crunch-dispatch gives a local path with --git-dir, but
397   # we might as well accept URLs there too in case it changes its
398   # mind.)
399   my $repo = $git_dir || $Job->{'repository'};
400
401   # Repository can be remote or local. If remote, we'll need to fetch it
402   # to a local dir before doing `git log` et al.
403   my $repo_location;
404
405   if ($repo =~ m{://|^[^/]*:}) {
406     # $repo is a git url we can clone, like git:// or https:// or
407     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
408     # not recognized here because distinguishing that from a local
409     # path is too fragile. If you really need something strange here,
410     # use the ssh:// form.
411     $repo_location = 'remote';
412   } elsif ($repo =~ m{^\.*/}) {
413     # $repo is a local path to a git index. We'll also resolve ../foo
414     # to ../foo/.git if the latter is a directory. To help
415     # disambiguate local paths from named hosted repositories, this
416     # form must be given as ./ or ../ if it's a relative path.
417     if (-d "$repo/.git") {
418       $repo = "$repo/.git";
419     }
420     $repo_location = 'local';
421   } else {
422     # $repo is none of the above. It must be the name of a hosted
423     # repository.
424     my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
425       'filters' => [['name','=',$repo]]
426         );
427     my @repos_found = @{$arv_repo_list->{'items'}};
428     my $n_found = $arv_repo_list->{'items_available'};
429     if ($n_found > 0) {
430       Log(undef, "Repository '$repo' -> "
431           . join(", ", map { $_->{'uuid'} } @repos_found));
432     }
433     if ($n_found != 1) {
434       croak("Error: Found $n_found repositories with name '$repo'.");
435     }
436     $repo = $repos_found[0]->{'fetch_url'};
437     $repo_location = 'remote';
438   }
439   Log(undef, "Using $repo_location repository '$repo'");
440   $ENV{"CRUNCH_SRC_URL"} = $repo;
441
442   # Resolve given script_version (we'll call that $treeish here) to a
443   # commit sha1 ($commit).
444   my $treeish = $Job->{'script_version'};
445   my $commit;
446   if ($repo_location eq 'remote') {
447     # We minimize excess object-fetching by re-using the same bare
448     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
449     # just keep adding remotes to it as needed.
450     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
451     my $gitcmd = "git --git-dir=\Q$local_repo\E";
452
453     # Set up our local repo for caching remote objects, making
454     # archives, etc.
455     if (!-d $local_repo) {
456       make_path($local_repo) or croak("Error: could not create $local_repo");
457     }
458     # This works (exits 0 and doesn't delete fetched objects) even
459     # if $local_repo is already initialized:
460     `$gitcmd init --bare`;
461     if ($?) {
462       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
463     }
464
465     # If $treeish looks like a hash (or abbrev hash) we look it up in
466     # our local cache first, since that's cheaper. (We don't want to
467     # do that with tags/branches though -- those change over time, so
468     # they should always be resolved by the remote repo.)
469     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
470       # Hide stderr because it's normal for this to fail:
471       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
472       if ($? == 0 &&
473           # Careful not to resolve a branch named abcdeff to commit 1234567:
474           $sha1 =~ /^$treeish/ &&
475           $sha1 =~ /^([0-9a-f]{40})$/s) {
476         $commit = $1;
477         Log(undef, "Commit $commit already present in $local_repo");
478       }
479     }
480
481     if (!defined $commit) {
482       # If $treeish isn't just a hash or abbrev hash, or isn't here
483       # yet, we need to fetch the remote to resolve it correctly.
484
485       # First, remove all local heads. This prevents a name that does
486       # not exist on the remote from resolving to (or colliding with)
487       # a previously fetched branch or tag (possibly from a different
488       # remote).
489       remove_tree("$local_repo/refs/heads", {keep_root => 1});
490
491       Log(undef, "Fetching objects from $repo to $local_repo");
492       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
493       if ($?) {
494         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
495       }
496     }
497
498     # Now that the data is all here, we will use our local repo for
499     # the rest of our git activities.
500     $repo = $local_repo;
501   }
502
503   my $gitcmd = "git --git-dir=\Q$repo\E";
504   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
505   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
506     croak("`$gitcmd rev-list` exited "
507           .exit_status_s($?)
508           .", '$treeish' not found. Giving up.");
509   }
510   $commit = $1;
511   Log(undef, "Version $treeish is commit $commit");
512
513   if ($commit ne $Job->{'script_version'}) {
514     # Record the real commit id in the database, frozentokey, logs,
515     # etc. -- instead of an abbreviation or a branch name which can
516     # become ambiguous or point to a different commit in the future.
517     if (!$Job->update_attributes('script_version' => $commit)) {
518       croak("Error: failed to update job's script_version attribute");
519     }
520   }
521
522   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
523   $git_archive = `$gitcmd archive ''\Q$commit\E`;
524   if ($?) {
525     croak("Error: $gitcmd archive exited ".exit_status_s($?));
526   }
527 }
528
529 if (!defined $git_archive) {
530   Log(undef, "Skip install phase (no git archive)");
531   if ($have_slurm) {
532     Log(undef, "Warning: This probably means workers have no source tree!");
533   }
534 }
535 else {
536   Log(undef, "Run install script on all workers");
537
538   my @srunargs = ("srun",
539                   "--nodelist=$nodelist",
540                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
541   my @execargs = ("sh", "-c",
542                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
543
544   # Note: this section is almost certainly unnecessary if we're
545   # running tasks in docker containers.
546   my $installpid = fork();
547   if ($installpid == 0)
548   {
549     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
550     exit (1);
551   }
552   while (1)
553   {
554     last if $installpid == waitpid (-1, WNOHANG);
555     freeze_if_want_freeze ($installpid);
556     select (undef, undef, undef, 0.1);
557   }
558   Log (undef, "Install script exited ".exit_status_s($?));
559 }
560
561 if (!$have_slurm)
562 {
563   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
564   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
565 }
566
567 # If this job requires a Docker image, install that.
568 my $docker_bin = "/usr/bin/docker.io";
569 my ($docker_locator, $docker_stream, $docker_hash);
570 if ($docker_locator = $Job->{docker_image_locator}) {
571   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
572   if (!$docker_hash)
573   {
574     croak("No Docker image hash found from locator $docker_locator");
575   }
576   $docker_stream =~ s/^\.//;
577   my $docker_install_script = qq{
578 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
579     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
580 fi
581 };
582   my $docker_pid = fork();
583   if ($docker_pid == 0)
584   {
585     srun (["srun", "--nodelist=" . join(',', @node)],
586           ["/bin/sh", "-ec", $docker_install_script]);
587     exit ($?);
588   }
589   while (1)
590   {
591     last if $docker_pid == waitpid (-1, WNOHANG);
592     freeze_if_want_freeze ($docker_pid);
593     select (undef, undef, undef, 0.1);
594   }
595   if ($? != 0)
596   {
597     croak("Installing Docker image from $docker_locator exited "
598           .exit_status_s($?));
599   }
600 }
601
602 foreach (qw (script script_version script_parameters runtime_constraints))
603 {
604   Log (undef,
605        "$_ " .
606        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
607 }
608 foreach (split (/\n/, $Job->{knobs}))
609 {
610   Log (undef, "knob " . $_);
611 }
612
613
614
615 $main::success = undef;
616
617
618
619 ONELEVEL:
620
621 my $thisround_succeeded = 0;
622 my $thisround_failed = 0;
623 my $thisround_failed_multiple = 0;
624
625 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
626                        or $a <=> $b } @jobstep_todo;
627 my $level = $jobstep[$jobstep_todo[0]]->{level};
628 Log (undef, "start level $level");
629
630
631
632 my %proc;
633 my @freeslot = (0..$#slot);
634 my @holdslot;
635 my %reader;
636 my $progress_is_dirty = 1;
637 my $progress_stats_updated = 0;
638
639 update_progress_stats();
640
641
642
643 THISROUND:
644 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
645 {
646   my $id = $jobstep_todo[$todo_ptr];
647   my $Jobstep = $jobstep[$id];
648   if ($Jobstep->{level} != $level)
649   {
650     next;
651   }
652
653   pipe $reader{$id}, "writer" or croak ($!);
654   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
655   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
656
657   my $childslot = $freeslot[0];
658   my $childnode = $slot[$childslot]->{node};
659   my $childslotname = join (".",
660                             $slot[$childslot]->{node}->{name},
661                             $slot[$childslot]->{cpu});
662   my $childpid = fork();
663   if ($childpid == 0)
664   {
665     $SIG{'INT'} = 'DEFAULT';
666     $SIG{'QUIT'} = 'DEFAULT';
667     $SIG{'TERM'} = 'DEFAULT';
668
669     foreach (values (%reader))
670     {
671       close($_);
672     }
673     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
674     open(STDOUT,">&writer");
675     open(STDERR,">&writer");
676
677     undef $dbh;
678     undef $sth;
679
680     delete $ENV{"GNUPGHOME"};
681     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
682     $ENV{"TASK_QSEQUENCE"} = $id;
683     $ENV{"TASK_SEQUENCE"} = $level;
684     $ENV{"JOB_SCRIPT"} = $Job->{script};
685     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
686       $param =~ tr/a-z/A-Z/;
687       $ENV{"JOB_PARAMETER_$param"} = $value;
688     }
689     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
690     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
691     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
692     $ENV{"HOME"} = $ENV{"TASK_WORK"};
693     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
694     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
695     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
696     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
697
698     $ENV{"GZIP"} = "-n";
699
700     my @srunargs = (
701       "srun",
702       "--nodelist=".$childnode->{name},
703       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
704       "--job-name=$job_id.$id.$$",
705         );
706     my $build_script_to_send = "";
707     my $command =
708         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
709         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
710         ."&& cd $ENV{CRUNCH_TMP} ";
711     if ($build_script)
712     {
713       $build_script_to_send = $build_script;
714       $command .=
715           "&& perl -";
716     }
717     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
718     if ($docker_hash)
719     {
720       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
721       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
722       # Dynamically configure the container to use the host system as its
723       # DNS server.  Get the host's global addresses from the ip command,
724       # and turn them into docker --dns options using gawk.
725       $command .=
726           q{$(ip -o address show scope global |
727               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
728       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
729       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
730       $command .= "--env=\QHOME=/home/crunch\E ";
731       while (my ($env_key, $env_val) = each %ENV)
732       {
733         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
734           if ($env_key eq "TASK_WORK") {
735             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
736           }
737           elsif ($env_key eq "TASK_KEEPMOUNT") {
738             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
739           }
740           else {
741             $command .= "--env=\Q$env_key=$env_val\E ";
742           }
743         }
744       }
745       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
746       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
747       $command .= "\Q$docker_hash\E ";
748       $command .= "stdbuf --output=0 --error=0 ";
749       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
750     } else {
751       # Non-docker run
752       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
753       $command .= "stdbuf --output=0 --error=0 ";
754       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
755     }
756
757     my @execargs = ('bash', '-c', $command);
758     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
759     # exec() failed, we assume nothing happened.
760     Log(undef, "srun() failed on build script");
761     die;
762   }
763   close("writer");
764   if (!defined $childpid)
765   {
766     close $reader{$id};
767     delete $reader{$id};
768     next;
769   }
770   shift @freeslot;
771   $proc{$childpid} = { jobstep => $id,
772                        time => time,
773                        slot => $childslot,
774                        jobstepname => "$job_id.$id.$childpid",
775                      };
776   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
777   $slot[$childslot]->{pid} = $childpid;
778
779   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
780   Log ($id, "child $childpid started on $childslotname");
781   $Jobstep->{starttime} = time;
782   $Jobstep->{node} = $childnode->{name};
783   $Jobstep->{slotindex} = $childslot;
784   delete $Jobstep->{stderr};
785   delete $Jobstep->{finishtime};
786
787   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
788   $Jobstep->{'arvados_task'}->save;
789
790   splice @jobstep_todo, $todo_ptr, 1;
791   --$todo_ptr;
792
793   $progress_is_dirty = 1;
794
795   while (!@freeslot
796          ||
797          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
798   {
799     last THISROUND if $main::please_freeze;
800     if ($main::please_info)
801     {
802       $main::please_info = 0;
803       freeze();
804       collate_output();
805       save_meta(1);
806       update_progress_stats();
807     }
808     my $gotsome
809         = readfrompipes ()
810         + reapchildren ();
811     if (!$gotsome)
812     {
813       check_refresh_wanted();
814       check_squeue();
815       update_progress_stats();
816       select (undef, undef, undef, 0.1);
817     }
818     elsif (time - $progress_stats_updated >= 30)
819     {
820       update_progress_stats();
821     }
822     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
823         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
824     {
825       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
826           .($thisround_failed+$thisround_succeeded)
827           .") -- giving up on this round";
828       Log (undef, $message);
829       last THISROUND;
830     }
831
832     # move slots from freeslot to holdslot (or back to freeslot) if necessary
833     for (my $i=$#freeslot; $i>=0; $i--) {
834       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
835         push @holdslot, (splice @freeslot, $i, 1);
836       }
837     }
838     for (my $i=$#holdslot; $i>=0; $i--) {
839       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
840         push @freeslot, (splice @holdslot, $i, 1);
841       }
842     }
843
844     # give up if no nodes are succeeding
845     if (!grep { $_->{node}->{losing_streak} == 0 &&
846                     $_->{node}->{hold_count} < 4 } @slot) {
847       my $message = "Every node has failed -- giving up on this round";
848       Log (undef, $message);
849       last THISROUND;
850     }
851   }
852 }
853
854
855 push @freeslot, splice @holdslot;
856 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
857
858
859 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
860 while (%proc)
861 {
862   if ($main::please_continue) {
863     $main::please_continue = 0;
864     goto THISROUND;
865   }
866   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
867   readfrompipes ();
868   if (!reapchildren())
869   {
870     check_refresh_wanted();
871     check_squeue();
872     update_progress_stats();
873     select (undef, undef, undef, 0.1);
874     killem (keys %proc) if $main::please_freeze;
875   }
876 }
877
878 update_progress_stats();
879 freeze_if_want_freeze();
880
881
882 if (!defined $main::success)
883 {
884   if (@jobstep_todo &&
885       $thisround_succeeded == 0 &&
886       ($thisround_failed == 0 || $thisround_failed > 4))
887   {
888     my $message = "stop because $thisround_failed tasks failed and none succeeded";
889     Log (undef, $message);
890     $main::success = 0;
891   }
892   if (!@jobstep_todo)
893   {
894     $main::success = 1;
895   }
896 }
897
898 goto ONELEVEL if !defined $main::success;
899
900
901 release_allocation();
902 freeze();
903 my $collated_output = &collate_output();
904
905 if (!$collated_output) {
906   Log(undef, "output undef");
907 }
908 else {
909   eval {
910     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
911         or die "failed to get collated manifest: $!";
912     my $orig_manifest_text = '';
913     while (my $manifest_line = <$orig_manifest>) {
914       $orig_manifest_text .= $manifest_line;
915     }
916     my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
917       'manifest_text' => $orig_manifest_text,
918     });
919     Log(undef, "output uuid " . $output->{uuid});
920     Log(undef, "output hash " . $output->{portable_data_hash});
921     $Job->update_attributes('output' => $output->{portable_data_hash});
922   };
923   if ($@) {
924     Log (undef, "Failed to register output manifest: $@");
925   }
926 }
927
928 Log (undef, "finish");
929
930 save_meta();
931
932 my $final_state;
933 if ($collated_output && $main::success) {
934   $final_state = 'Complete';
935 } else {
936   $final_state = 'Failed';
937 }
938 $Job->update_attributes('state' => $final_state);
939
940 exit (($final_state eq 'Complete') ? 0 : 1);
941
942
943
944 sub update_progress_stats
945 {
946   $progress_stats_updated = time;
947   return if !$progress_is_dirty;
948   my ($todo, $done, $running) = (scalar @jobstep_todo,
949                                  scalar @jobstep_done,
950                                  scalar @slot - scalar @freeslot - scalar @holdslot);
951   $Job->{'tasks_summary'} ||= {};
952   $Job->{'tasks_summary'}->{'todo'} = $todo;
953   $Job->{'tasks_summary'}->{'done'} = $done;
954   $Job->{'tasks_summary'}->{'running'} = $running;
955   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
956   Log (undef, "status: $done done, $running running, $todo todo");
957   $progress_is_dirty = 0;
958 }
959
960
961
962 sub reapchildren
963 {
964   my $pid = waitpid (-1, WNOHANG);
965   return 0 if $pid <= 0;
966
967   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
968                   . "."
969                   . $slot[$proc{$pid}->{slot}]->{cpu});
970   my $jobstepid = $proc{$pid}->{jobstep};
971   my $elapsed = time - $proc{$pid}->{time};
972   my $Jobstep = $jobstep[$jobstepid];
973
974   my $childstatus = $?;
975   my $exitvalue = $childstatus >> 8;
976   my $exitinfo = "exit ".exit_status_s($childstatus);
977   $Jobstep->{'arvados_task'}->reload;
978   my $task_success = $Jobstep->{'arvados_task'}->{success};
979
980   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
981
982   if (!defined $task_success) {
983     # task did not indicate one way or the other --> fail
984     $Jobstep->{'arvados_task'}->{success} = 0;
985     $Jobstep->{'arvados_task'}->save;
986     $task_success = 0;
987   }
988
989   if (!$task_success)
990   {
991     my $temporary_fail;
992     $temporary_fail ||= $Jobstep->{node_fail};
993     $temporary_fail ||= ($exitvalue == 111);
994
995     ++$thisround_failed;
996     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
997
998     # Check for signs of a failed or misconfigured node
999     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1000         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1001       # Don't count this against jobstep failure thresholds if this
1002       # node is already suspected faulty and srun exited quickly
1003       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1004           $elapsed < 5) {
1005         Log ($jobstepid, "blaming failure on suspect node " .
1006              $slot[$proc{$pid}->{slot}]->{node}->{name});
1007         $temporary_fail ||= 1;
1008       }
1009       ban_node_by_slot($proc{$pid}->{slot});
1010     }
1011
1012     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1013                              ++$Jobstep->{'failures'},
1014                              $temporary_fail ? 'temporary ' : 'permanent',
1015                              $elapsed));
1016
1017     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1018       # Give up on this task, and the whole job
1019       $main::success = 0;
1020       $main::please_freeze = 1;
1021     }
1022     # Put this task back on the todo queue
1023     push @jobstep_todo, $jobstepid;
1024     $Job->{'tasks_summary'}->{'failed'}++;
1025   }
1026   else
1027   {
1028     ++$thisround_succeeded;
1029     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1030     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1031     push @jobstep_done, $jobstepid;
1032     Log ($jobstepid, "success in $elapsed seconds");
1033   }
1034   $Jobstep->{exitcode} = $childstatus;
1035   $Jobstep->{finishtime} = time;
1036   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1037   $Jobstep->{'arvados_task'}->save;
1038   process_stderr ($jobstepid, $task_success);
1039   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1040
1041   close $reader{$jobstepid};
1042   delete $reader{$jobstepid};
1043   delete $slot[$proc{$pid}->{slot}]->{pid};
1044   push @freeslot, $proc{$pid}->{slot};
1045   delete $proc{$pid};
1046
1047   if ($task_success) {
1048     # Load new tasks
1049     my $newtask_list = [];
1050     my $newtask_results;
1051     do {
1052       $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1053         'where' => {
1054           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1055         },
1056         'order' => 'qsequence',
1057         'offset' => scalar(@$newtask_list),
1058       );
1059       push(@$newtask_list, @{$newtask_results->{items}});
1060     } while (@{$newtask_results->{items}});
1061     foreach my $arvados_task (@$newtask_list) {
1062       my $jobstep = {
1063         'level' => $arvados_task->{'sequence'},
1064         'failures' => 0,
1065         'arvados_task' => $arvados_task
1066       };
1067       push @jobstep, $jobstep;
1068       push @jobstep_todo, $#jobstep;
1069     }
1070   }
1071
1072   $progress_is_dirty = 1;
1073   1;
1074 }
1075
1076 sub check_refresh_wanted
1077 {
1078   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1079   if (@stat && $stat[9] > $latest_refresh) {
1080     $latest_refresh = scalar time;
1081     my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1082     for my $attr ('cancelled_at',
1083                   'cancelled_by_user_uuid',
1084                   'cancelled_by_client_uuid',
1085                   'state') {
1086       $Job->{$attr} = $Job2->{$attr};
1087     }
1088     if ($Job->{'state'} ne "Running") {
1089       if ($Job->{'state'} eq "Cancelled") {
1090         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1091       } else {
1092         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1093       }
1094       $main::success = 0;
1095       $main::please_freeze = 1;
1096     }
1097   }
1098 }
1099
1100 sub check_squeue
1101 {
1102   # return if the kill list was checked <4 seconds ago
1103   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1104   {
1105     return;
1106   }
1107   $squeue_kill_checked = time;
1108
1109   # use killem() on procs whose killtime is reached
1110   for (keys %proc)
1111   {
1112     if (exists $proc{$_}->{killtime}
1113         && $proc{$_}->{killtime} <= time)
1114     {
1115       killem ($_);
1116     }
1117   }
1118
1119   # return if the squeue was checked <60 seconds ago
1120   if (defined $squeue_checked && $squeue_checked > time - 60)
1121   {
1122     return;
1123   }
1124   $squeue_checked = time;
1125
1126   if (!$have_slurm)
1127   {
1128     # here is an opportunity to check for mysterious problems with local procs
1129     return;
1130   }
1131
1132   # get a list of steps still running
1133   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1134   chop @squeue;
1135   if ($squeue[-1] ne "ok")
1136   {
1137     return;
1138   }
1139   pop @squeue;
1140
1141   # which of my jobsteps are running, according to squeue?
1142   my %ok;
1143   foreach (@squeue)
1144   {
1145     if (/^(\d+)\.(\d+) (\S+)/)
1146     {
1147       if ($1 eq $ENV{SLURM_JOBID})
1148       {
1149         $ok{$3} = 1;
1150       }
1151     }
1152   }
1153
1154   # which of my active child procs (>60s old) were not mentioned by squeue?
1155   foreach (keys %proc)
1156   {
1157     if ($proc{$_}->{time} < time - 60
1158         && !exists $ok{$proc{$_}->{jobstepname}}
1159         && !exists $proc{$_}->{killtime})
1160     {
1161       # kill this proc if it hasn't exited in 30 seconds
1162       $proc{$_}->{killtime} = time + 30;
1163     }
1164   }
1165 }
1166
1167
1168 sub release_allocation
1169 {
1170   if ($have_slurm)
1171   {
1172     Log (undef, "release job allocation");
1173     system "scancel $ENV{SLURM_JOBID}";
1174   }
1175 }
1176
1177
1178 sub readfrompipes
1179 {
1180   my $gotsome = 0;
1181   foreach my $job (keys %reader)
1182   {
1183     my $buf;
1184     while (0 < sysread ($reader{$job}, $buf, 8192))
1185     {
1186       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1187       $jobstep[$job]->{stderr} .= $buf;
1188       preprocess_stderr ($job);
1189       if (length ($jobstep[$job]->{stderr}) > 16384)
1190       {
1191         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1192       }
1193       $gotsome = 1;
1194     }
1195   }
1196   return $gotsome;
1197 }
1198
1199
1200 sub preprocess_stderr
1201 {
1202   my $job = shift;
1203
1204   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1205     my $line = $1;
1206     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1207     Log ($job, "stderr $line");
1208     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1209       # whoa.
1210       $main::please_freeze = 1;
1211     }
1212     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1213       $jobstep[$job]->{node_fail} = 1;
1214       ban_node_by_slot($jobstep[$job]->{slotindex});
1215     }
1216   }
1217 }
1218
1219
1220 sub process_stderr
1221 {
1222   my $job = shift;
1223   my $task_success = shift;
1224   preprocess_stderr ($job);
1225
1226   map {
1227     Log ($job, "stderr $_");
1228   } split ("\n", $jobstep[$job]->{stderr});
1229 }
1230
1231 sub fetch_block
1232 {
1233   my $hash = shift;
1234   my ($keep, $child_out, $output_block);
1235
1236   my $cmd = "arv-get \Q$hash\E";
1237   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1238   $output_block = '';
1239   while (1) {
1240     my $buf;
1241     my $bytes = sysread($keep, $buf, 1024 * 1024);
1242     if (!defined $bytes) {
1243       die "reading from arv-get: $!";
1244     } elsif ($bytes == 0) {
1245       # sysread returns 0 at the end of the pipe.
1246       last;
1247     } else {
1248       # some bytes were read into buf.
1249       $output_block .= $buf;
1250     }
1251   }
1252   close $keep;
1253   return $output_block;
1254 }
1255
1256 sub collate_output
1257 {
1258   Log (undef, "collate");
1259
1260   my ($child_out, $child_in);
1261   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1262                   '--retries', put_retry_count());
1263   my $joboutput;
1264   for (@jobstep)
1265   {
1266     next if (!exists $_->{'arvados_task'}->{'output'} ||
1267              !$_->{'arvados_task'}->{'success'});
1268     my $output = $_->{'arvados_task'}->{output};
1269     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1270     {
1271       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1272       print $child_in $output;
1273     }
1274     elsif (@jobstep == 1)
1275     {
1276       $joboutput = $output;
1277       last;
1278     }
1279     elsif (defined (my $outblock = fetch_block ($output)))
1280     {
1281       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1282       print $child_in $outblock;
1283     }
1284     else
1285     {
1286       Log (undef, "XXX fetch_block($output) failed XXX");
1287       $main::success = 0;
1288     }
1289   }
1290   $child_in->close;
1291
1292   if (!defined $joboutput) {
1293     my $s = IO::Select->new($child_out);
1294     if ($s->can_read(120)) {
1295       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1296       chomp($joboutput);
1297       # TODO: Ensure exit status == 0.
1298     } else {
1299       Log (undef, "timed out reading from 'arv-put'");
1300     }
1301   }
1302   # TODO: kill $pid instead of waiting, now that we've decided to
1303   # ignore further output.
1304   waitpid($pid, 0);
1305
1306   return $joboutput;
1307 }
1308
1309
1310 sub killem
1311 {
1312   foreach (@_)
1313   {
1314     my $sig = 2;                # SIGINT first
1315     if (exists $proc{$_}->{"sent_$sig"} &&
1316         time - $proc{$_}->{"sent_$sig"} > 4)
1317     {
1318       $sig = 15;                # SIGTERM if SIGINT doesn't work
1319     }
1320     if (exists $proc{$_}->{"sent_$sig"} &&
1321         time - $proc{$_}->{"sent_$sig"} > 4)
1322     {
1323       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1324     }
1325     if (!exists $proc{$_}->{"sent_$sig"})
1326     {
1327       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1328       kill $sig, $_;
1329       select (undef, undef, undef, 0.1);
1330       if ($sig == 2)
1331       {
1332         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1333       }
1334       $proc{$_}->{"sent_$sig"} = time;
1335       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1336     }
1337   }
1338 }
1339
1340
1341 sub fhbits
1342 {
1343   my($bits);
1344   for (@_) {
1345     vec($bits,fileno($_),1) = 1;
1346   }
1347   $bits;
1348 }
1349
1350
1351 # Send log output to Keep via arv-put.
1352 #
1353 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1354 # $log_pipe_pid is the pid of the arv-put subprocess.
1355 #
1356 # The only functions that should access these variables directly are:
1357 #
1358 # log_writer_start($logfilename)
1359 #     Starts an arv-put pipe, reading data on stdin and writing it to
1360 #     a $logfilename file in an output collection.
1361 #
1362 # log_writer_send($txt)
1363 #     Writes $txt to the output log collection.
1364 #
1365 # log_writer_finish()
1366 #     Closes the arv-put pipe and returns the output that it produces.
1367 #
1368 # log_writer_is_active()
1369 #     Returns a true value if there is currently a live arv-put
1370 #     process, false otherwise.
1371 #
1372 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1373
1374 sub log_writer_start($)
1375 {
1376   my $logfilename = shift;
1377   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1378                         'arv-put', '--portable-data-hash',
1379                         '--retries', '3',
1380                         '--filename', $logfilename,
1381                         '-');
1382 }
1383
1384 sub log_writer_send($)
1385 {
1386   my $txt = shift;
1387   print $log_pipe_in $txt;
1388 }
1389
1390 sub log_writer_finish()
1391 {
1392   return unless $log_pipe_pid;
1393
1394   close($log_pipe_in);
1395   my $arv_put_output;
1396
1397   my $s = IO::Select->new($log_pipe_out);
1398   if ($s->can_read(120)) {
1399     sysread($log_pipe_out, $arv_put_output, 1024);
1400     chomp($arv_put_output);
1401   } else {
1402     Log (undef, "timed out reading from 'arv-put'");
1403   }
1404
1405   waitpid($log_pipe_pid, 0);
1406   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1407   if ($?) {
1408     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1409   }
1410
1411   return $arv_put_output;
1412 }
1413
1414 sub log_writer_is_active() {
1415   return $log_pipe_pid;
1416 }
1417
1418 sub Log                         # ($jobstep_id, $logmessage)
1419 {
1420   if ($_[1] =~ /\n/) {
1421     for my $line (split (/\n/, $_[1])) {
1422       Log ($_[0], $line);
1423     }
1424     return;
1425   }
1426   my $fh = select STDERR; $|=1; select $fh;
1427   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1428   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1429   $message .= "\n";
1430   my $datetime;
1431   if (log_writer_is_active() || -t STDERR) {
1432     my @gmtime = gmtime;
1433     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1434                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1435   }
1436   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1437
1438   if (log_writer_is_active()) {
1439     log_writer_send($datetime . " " . $message);
1440   }
1441 }
1442
1443
1444 sub croak
1445 {
1446   my ($package, $file, $line) = caller;
1447   my $message = "@_ at $file line $line\n";
1448   Log (undef, $message);
1449   freeze() if @jobstep_todo;
1450   collate_output() if @jobstep_todo;
1451   cleanup();
1452   save_meta();
1453   die;
1454 }
1455
1456
1457 sub cleanup
1458 {
1459   return unless $Job;
1460   if ($Job->{'state'} eq 'Cancelled') {
1461     $Job->update_attributes('finished_at' => scalar gmtime);
1462   } else {
1463     $Job->update_attributes('state' => 'Failed');
1464   }
1465 }
1466
1467
1468 sub save_meta
1469 {
1470   my $justcheckpoint = shift; # false if this will be the last meta saved
1471   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1472   return unless log_writer_is_active();
1473
1474   my $loglocator = log_writer_finish();
1475   Log (undef, "log manifest is $loglocator");
1476   $Job->{'log'} = $loglocator;
1477   $Job->update_attributes('log', $loglocator);
1478 }
1479
1480
1481 sub freeze_if_want_freeze
1482 {
1483   if ($main::please_freeze)
1484   {
1485     release_allocation();
1486     if (@_)
1487     {
1488       # kill some srun procs before freeze+stop
1489       map { $proc{$_} = {} } @_;
1490       while (%proc)
1491       {
1492         killem (keys %proc);
1493         select (undef, undef, undef, 0.1);
1494         my $died;
1495         while (($died = waitpid (-1, WNOHANG)) > 0)
1496         {
1497           delete $proc{$died};
1498         }
1499       }
1500     }
1501     freeze();
1502     collate_output();
1503     cleanup();
1504     save_meta();
1505     exit 1;
1506   }
1507 }
1508
1509
1510 sub freeze
1511 {
1512   Log (undef, "Freeze not implemented");
1513   return;
1514 }
1515
1516
1517 sub thaw
1518 {
1519   croak ("Thaw not implemented");
1520 }
1521
1522
1523 sub freezequote
1524 {
1525   my $s = shift;
1526   $s =~ s/\\/\\\\/g;
1527   $s =~ s/\n/\\n/g;
1528   return $s;
1529 }
1530
1531
1532 sub freezeunquote
1533 {
1534   my $s = shift;
1535   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1536   return $s;
1537 }
1538
1539
1540 sub srun
1541 {
1542   my $srunargs = shift;
1543   my $execargs = shift;
1544   my $opts = shift || {};
1545   my $stdin = shift;
1546   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1547   print STDERR (join (" ",
1548                       map { / / ? "'$_'" : $_ }
1549                       (@$args)),
1550                 "\n")
1551       if $ENV{CRUNCH_DEBUG};
1552
1553   if (defined $stdin) {
1554     my $child = open STDIN, "-|";
1555     defined $child or die "no fork: $!";
1556     if ($child == 0) {
1557       print $stdin or die $!;
1558       close STDOUT or die $!;
1559       exit 0;
1560     }
1561   }
1562
1563   return system (@$args) if $opts->{fork};
1564
1565   exec @$args;
1566   warn "ENV size is ".length(join(" ",%ENV));
1567   die "exec failed: $!: @$args";
1568 }
1569
1570
1571 sub ban_node_by_slot {
1572   # Don't start any new jobsteps on this node for 60 seconds
1573   my $slotid = shift;
1574   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1575   $slot[$slotid]->{node}->{hold_count}++;
1576   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1577 }
1578
1579 sub must_lock_now
1580 {
1581   my ($lockfile, $error_message) = @_;
1582   open L, ">", $lockfile or croak("$lockfile: $!");
1583   if (!flock L, LOCK_EX|LOCK_NB) {
1584     croak("Can't lock $lockfile: $error_message\n");
1585   }
1586 }
1587
1588 sub find_docker_image {
1589   # Given a Keep locator, check to see if it contains a Docker image.
1590   # If so, return its stream name and Docker hash.
1591   # If not, return undef for both values.
1592   my $locator = shift;
1593   my ($streamname, $filename);
1594   if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1595     foreach my $line (split(/\n/, $image->{manifest_text})) {
1596       my @tokens = split(/\s+/, $line);
1597       next if (!@tokens);
1598       $streamname = shift(@tokens);
1599       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1600         if (defined($filename)) {
1601           return (undef, undef);  # More than one file in the Collection.
1602         } else {
1603           $filename = (split(/:/, $filedata, 3))[2];
1604         }
1605       }
1606     }
1607   }
1608   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1609     return ($streamname, $1);
1610   } else {
1611     return (undef, undef);
1612   }
1613 }
1614
1615 sub put_retry_count {
1616   # Calculate a --retries argument for arv-put that will have it try
1617   # approximately as long as this Job has been running.
1618   my $stoptime = shift || time;
1619   my $starttime = $jobstep[0]->{starttime};
1620   my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1621   my $retries = 0;
1622   while ($timediff >= 2) {
1623     $retries++;
1624     $timediff /= 2;
1625   }
1626   return ($retries > 3) ? $retries : 3;
1627 }
1628
1629 sub exit_status_s {
1630   # Given a $?, return a human-readable exit code string like "0" or
1631   # "1" or "0 with signal 1" or "1 with signal 11".
1632   my $exitcode = shift;
1633   my $s = $exitcode >> 8;
1634   if ($exitcode & 0x7f) {
1635     $s .= " with signal " . ($exitcode & 0x7f);
1636   }
1637   if ($exitcode & 0x80) {
1638     $s .= " with core dump";
1639   }
1640   return $s;
1641 }
1642
1643 __DATA__
1644 #!/usr/bin/perl
1645
1646 # checkout-and-build
1647
1648 use Fcntl ':flock';
1649 use File::Path qw( make_path );
1650
1651 my $destdir = $ENV{"CRUNCH_SRC"};
1652 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1653 my $repo = $ENV{"CRUNCH_SRC_URL"};
1654 my $task_work = $ENV{"TASK_WORK"};
1655
1656 for my $dir ($destdir, $task_work) {
1657     if ($dir) {
1658         make_path $dir;
1659         -e $dir or die "Failed to create temporary directory ($dir): $!";
1660     }
1661 }
1662
1663 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1664 flock L, LOCK_EX;
1665 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1666     if (@ARGV) {
1667         exec(@ARGV);
1668         die "Cannot exec `@ARGV`: $!";
1669     } else {
1670         exit 0;
1671     }
1672 }
1673
1674 unlink "$destdir.commit";
1675 open STDOUT, ">", "$destdir.log";
1676 open STDERR, ">&STDOUT";
1677
1678 mkdir $destdir;
1679 my @git_archive_data = <DATA>;
1680 if (@git_archive_data) {
1681   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1682   print TARX @git_archive_data;
1683   if(!close(TARX)) {
1684     die "'tar -C $destdir -xf -' exited $?: $!";
1685   }
1686 }
1687
1688 my $pwd;
1689 chomp ($pwd = `pwd`);
1690 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1691 mkdir $install_dir;
1692
1693 for my $src_path ("$destdir/arvados/sdk/python") {
1694   if (-d $src_path) {
1695     shell_or_die ("virtualenv", $install_dir);
1696     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1697   }
1698 }
1699
1700 if (-e "$destdir/crunch_scripts/install") {
1701     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1702 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1703     # Old version
1704     shell_or_die ("./tests/autotests.sh", $install_dir);
1705 } elsif (-e "./install.sh") {
1706     shell_or_die ("./install.sh", $install_dir);
1707 }
1708
1709 if ($commit) {
1710     unlink "$destdir.commit.new";
1711     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1712     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1713 }
1714
1715 close L;
1716
1717 if (@ARGV) {
1718     exec(@ARGV);
1719     die "Cannot exec `@ARGV`: $!";
1720 } else {
1721     exit 0;
1722 }
1723
1724 sub shell_or_die
1725 {
1726   if ($ENV{"DEBUG"}) {
1727     print STDERR "@_\n";
1728   }
1729   system (@_) == 0
1730       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1731 }
1732
1733 __DATA__