Merge branch 'master' into 3618-column-ordering
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Digest::MD5 qw(md5_hex);
90 use Getopt::Long;
91 use IPC::Open2;
92 use IO::Select;
93 use File::Temp;
94 use Fcntl ':flock';
95 use File::Path qw( make_path remove_tree );
96
97 use constant EX_TEMPFAIL => 75;
98
99 $ENV{"TMPDIR"} ||= "/tmp";
100 unless (defined $ENV{"CRUNCH_TMP"}) {
101   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
102   if ($ENV{"USER"} ne "crunch" && $< != 0) {
103     # use a tmp dir unique for my uid
104     $ENV{"CRUNCH_TMP"} .= "-$<";
105   }
106 }
107
108 # Create the tmp directory if it does not exist
109 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
110   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
111 }
112
113 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
114 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
115 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
116 mkdir ($ENV{"JOB_WORK"});
117
118 my $force_unlock;
119 my $git_dir;
120 my $jobspec;
121 my $job_api_token;
122 my $no_clear_tmp;
123 my $resume_stash;
124 GetOptions('force-unlock' => \$force_unlock,
125            'git-dir=s' => \$git_dir,
126            'job=s' => \$jobspec,
127            'job-api-token=s' => \$job_api_token,
128            'no-clear-tmp' => \$no_clear_tmp,
129            'resume-stash=s' => \$resume_stash,
130     );
131
132 if (defined $job_api_token) {
133   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
134 }
135
136 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
137 my $local_job = 0;
138
139
140 $SIG{'USR1'} = sub
141 {
142   $main::ENV{CRUNCH_DEBUG} = 1;
143 };
144 $SIG{'USR2'} = sub
145 {
146   $main::ENV{CRUNCH_DEBUG} = 0;
147 };
148
149
150
151 my $arv = Arvados->new('apiVersion' => 'v1');
152
153 my $Job;
154 my $job_id;
155 my $dbh;
156 my $sth;
157 my @jobstep;
158
159 my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
160
161 if ($jobspec =~ /^[-a-z\d]+$/)
162 {
163   # $jobspec is an Arvados UUID, not a JSON job specification
164   $Job = retry_op(sub {
165     $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
166   });
167   if (!$force_unlock) {
168     # Claim this job, and make sure nobody else does
169     eval { retry_op(sub {
170       # lock() sets is_locked_by_uuid and changes state to Running.
171       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
172     }); };
173     if ($@) {
174       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
175       exit EX_TEMPFAIL;
176     };
177   }
178 }
179 else
180 {
181   $Job = JSON::decode_json($jobspec);
182
183   if (!$resume_stash)
184   {
185     map { croak ("No $_ specified") unless $Job->{$_} }
186     qw(script script_version script_parameters);
187   }
188
189   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
190   $Job->{'started_at'} = gmtime;
191   $Job->{'state'} = 'Running';
192
193   $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
194 }
195 $job_id = $Job->{'uuid'};
196
197 my $keep_logfile = $job_id . '.log.txt';
198 log_writer_start($keep_logfile);
199
200 $Job->{'runtime_constraints'} ||= {};
201 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
202 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
203
204
205 Log (undef, "check slurm allocation");
206 my @slot;
207 my @node;
208 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
209 my @sinfo;
210 if (!$have_slurm)
211 {
212   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
213   push @sinfo, "$localcpus localhost";
214 }
215 if (exists $ENV{SLURM_NODELIST})
216 {
217   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
218 }
219 foreach (@sinfo)
220 {
221   my ($ncpus, $slurm_nodelist) = split;
222   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
223
224   my @nodelist;
225   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
226   {
227     my $nodelist = $1;
228     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
229     {
230       my $ranges = $1;
231       foreach (split (",", $ranges))
232       {
233         my ($a, $b);
234         if (/(\d+)-(\d+)/)
235         {
236           $a = $1;
237           $b = $2;
238         }
239         else
240         {
241           $a = $_;
242           $b = $_;
243         }
244         push @nodelist, map {
245           my $n = $nodelist;
246           $n =~ s/\[[-,\d]+\]/$_/;
247           $n;
248         } ($a..$b);
249       }
250     }
251     else
252     {
253       push @nodelist, $nodelist;
254     }
255   }
256   foreach my $nodename (@nodelist)
257   {
258     Log (undef, "node $nodename - $ncpus slots");
259     my $node = { name => $nodename,
260                  ncpus => $ncpus,
261                  losing_streak => 0,
262                  hold_until => 0 };
263     foreach my $cpu (1..$ncpus)
264     {
265       push @slot, { node => $node,
266                     cpu => $cpu };
267     }
268   }
269   push @node, @nodelist;
270 }
271
272
273
274 # Ensure that we get one jobstep running on each allocated node before
275 # we start overloading nodes with concurrent steps
276
277 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
278
279
280 $Job->update_attributes(
281   'tasks_summary' => { 'failed' => 0,
282                        'todo' => 1,
283                        'running' => 0,
284                        'done' => 0 });
285
286 Log (undef, "start");
287 $SIG{'INT'} = sub { $main::please_freeze = 1; };
288 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
289 $SIG{'TERM'} = \&croak;
290 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
291 $SIG{'ALRM'} = sub { $main::please_info = 1; };
292 $SIG{'CONT'} = sub { $main::please_continue = 1; };
293 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
294
295 $main::please_freeze = 0;
296 $main::please_info = 0;
297 $main::please_continue = 0;
298 $main::please_refresh = 0;
299 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
300
301 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
302 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
303 $ENV{"JOB_UUID"} = $job_id;
304
305
306 my @jobstep_todo = ();
307 my @jobstep_done = ();
308 my @jobstep_tomerge = ();
309 my $jobstep_tomerge_level = 0;
310 my $squeue_checked;
311 my $squeue_kill_checked;
312 my $output_in_keep = 0;
313 my $latest_refresh = scalar time;
314
315
316
317 if (defined $Job->{thawedfromkey})
318 {
319   thaw ($Job->{thawedfromkey});
320 }
321 else
322 {
323   my $first_task = retry_op(sub {
324     $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
325       'job_uuid' => $Job->{'uuid'},
326       'sequence' => 0,
327       'qsequence' => 0,
328       'parameters' => {},
329     });
330   });
331   push @jobstep, { 'level' => 0,
332                    'failures' => 0,
333                    'arvados_task' => $first_task,
334                  };
335   push @jobstep_todo, 0;
336 }
337
338
339 if (!$have_slurm)
340 {
341   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
342 }
343
344
345 my $build_script;
346 do {
347   local $/ = undef;
348   $build_script = <DATA>;
349 };
350 my $nodelist = join(",", @node);
351
352 if (!defined $no_clear_tmp) {
353   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
354   Log (undef, "Clean work dirs");
355
356   my $cleanpid = fork();
357   if ($cleanpid == 0)
358   {
359     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
360           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
361     exit (1);
362   }
363   while (1)
364   {
365     last if $cleanpid == waitpid (-1, WNOHANG);
366     freeze_if_want_freeze ($cleanpid);
367     select (undef, undef, undef, 0.1);
368   }
369   Log (undef, "Cleanup command exited ".exit_status_s($?));
370 }
371
372
373 my $git_archive;
374 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
375   # If script_version looks like an absolute path, *and* the --git-dir
376   # argument was not given -- which implies we were not invoked by
377   # crunch-dispatch -- we will use the given path as a working
378   # directory instead of resolving script_version to a git commit (or
379   # doing anything else with git).
380   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
381   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
382 }
383 else {
384   # Resolve the given script_version to a git commit sha1. Also, if
385   # the repository is remote, clone it into our local filesystem: this
386   # ensures "git archive" will work, and is necessary to reliably
387   # resolve a symbolic script_version like "master^".
388   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
389
390   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
391
392   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
393
394   # If we're running under crunch-dispatch, it will have already
395   # pulled the appropriate source tree into its own repository, and
396   # given us that repo's path as $git_dir.
397   #
398   # If we're running a "local" job, we might have to fetch content
399   # from a remote repository.
400   #
401   # (Currently crunch-dispatch gives a local path with --git-dir, but
402   # we might as well accept URLs there too in case it changes its
403   # mind.)
404   my $repo = $git_dir || $Job->{'repository'};
405
406   # Repository can be remote or local. If remote, we'll need to fetch it
407   # to a local dir before doing `git log` et al.
408   my $repo_location;
409
410   if ($repo =~ m{://|^[^/]*:}) {
411     # $repo is a git url we can clone, like git:// or https:// or
412     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
413     # not recognized here because distinguishing that from a local
414     # path is too fragile. If you really need something strange here,
415     # use the ssh:// form.
416     $repo_location = 'remote';
417   } elsif ($repo =~ m{^\.*/}) {
418     # $repo is a local path to a git index. We'll also resolve ../foo
419     # to ../foo/.git if the latter is a directory. To help
420     # disambiguate local paths from named hosted repositories, this
421     # form must be given as ./ or ../ if it's a relative path.
422     if (-d "$repo/.git") {
423       $repo = "$repo/.git";
424     }
425     $repo_location = 'local';
426   } else {
427     # $repo is none of the above. It must be the name of a hosted
428     # repository.
429     my $arv_repo_list = retry_op(sub {
430       $arv->{'repositories'}->{'list'}->execute(
431         'filters' => [['name','=',$repo]]);
432     });
433     my @repos_found = @{$arv_repo_list->{'items'}};
434     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
435     if ($n_found > 0) {
436       Log(undef, "Repository '$repo' -> "
437           . join(", ", map { $_->{'uuid'} } @repos_found));
438     }
439     if ($n_found != 1) {
440       croak("Error: Found $n_found repositories with name '$repo'.");
441     }
442     $repo = $repos_found[0]->{'fetch_url'};
443     $repo_location = 'remote';
444   }
445   Log(undef, "Using $repo_location repository '$repo'");
446   $ENV{"CRUNCH_SRC_URL"} = $repo;
447
448   # Resolve given script_version (we'll call that $treeish here) to a
449   # commit sha1 ($commit).
450   my $treeish = $Job->{'script_version'};
451   my $commit;
452   if ($repo_location eq 'remote') {
453     # We minimize excess object-fetching by re-using the same bare
454     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
455     # just keep adding remotes to it as needed.
456     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
457     my $gitcmd = "git --git-dir=\Q$local_repo\E";
458
459     # Set up our local repo for caching remote objects, making
460     # archives, etc.
461     if (!-d $local_repo) {
462       make_path($local_repo) or croak("Error: could not create $local_repo");
463     }
464     # This works (exits 0 and doesn't delete fetched objects) even
465     # if $local_repo is already initialized:
466     `$gitcmd init --bare`;
467     if ($?) {
468       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
469     }
470
471     # If $treeish looks like a hash (or abbrev hash) we look it up in
472     # our local cache first, since that's cheaper. (We don't want to
473     # do that with tags/branches though -- those change over time, so
474     # they should always be resolved by the remote repo.)
475     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
476       # Hide stderr because it's normal for this to fail:
477       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
478       if ($? == 0 &&
479           # Careful not to resolve a branch named abcdeff to commit 1234567:
480           $sha1 =~ /^$treeish/ &&
481           $sha1 =~ /^([0-9a-f]{40})$/s) {
482         $commit = $1;
483         Log(undef, "Commit $commit already present in $local_repo");
484       }
485     }
486
487     if (!defined $commit) {
488       # If $treeish isn't just a hash or abbrev hash, or isn't here
489       # yet, we need to fetch the remote to resolve it correctly.
490
491       # First, remove all local heads. This prevents a name that does
492       # not exist on the remote from resolving to (or colliding with)
493       # a previously fetched branch or tag (possibly from a different
494       # remote).
495       remove_tree("$local_repo/refs/heads", {keep_root => 1});
496
497       Log(undef, "Fetching objects from $repo to $local_repo");
498       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
499       if ($?) {
500         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
501       }
502     }
503
504     # Now that the data is all here, we will use our local repo for
505     # the rest of our git activities.
506     $repo = $local_repo;
507   }
508
509   my $gitcmd = "git --git-dir=\Q$repo\E";
510   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
511   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
512     croak("`$gitcmd rev-list` exited "
513           .exit_status_s($?)
514           .", '$treeish' not found. Giving up.");
515   }
516   $commit = $1;
517   Log(undef, "Version $treeish is commit $commit");
518
519   if ($commit ne $Job->{'script_version'}) {
520     # Record the real commit id in the database, frozentokey, logs,
521     # etc. -- instead of an abbreviation or a branch name which can
522     # become ambiguous or point to a different commit in the future.
523     if (!$Job->update_attributes('script_version' => $commit)) {
524       croak("Error: failed to update job's script_version attribute");
525     }
526   }
527
528   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
529   $git_archive = `$gitcmd archive ''\Q$commit\E`;
530   if ($?) {
531     croak("Error: $gitcmd archive exited ".exit_status_s($?));
532   }
533 }
534
535 if (!defined $git_archive) {
536   Log(undef, "Skip install phase (no git archive)");
537   if ($have_slurm) {
538     Log(undef, "Warning: This probably means workers have no source tree!");
539   }
540 }
541 else {
542   Log(undef, "Run install script on all workers");
543
544   my @srunargs = ("srun",
545                   "--nodelist=$nodelist",
546                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
547   my @execargs = ("sh", "-c",
548                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
549
550   # Note: this section is almost certainly unnecessary if we're
551   # running tasks in docker containers.
552   my $installpid = fork();
553   if ($installpid == 0)
554   {
555     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
556     exit (1);
557   }
558   while (1)
559   {
560     last if $installpid == waitpid (-1, WNOHANG);
561     freeze_if_want_freeze ($installpid);
562     select (undef, undef, undef, 0.1);
563   }
564   Log (undef, "Install script exited ".exit_status_s($?));
565 }
566
567 if (!$have_slurm)
568 {
569   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
570   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
571 }
572
573 # If this job requires a Docker image, install that.
574 my $docker_bin = "/usr/bin/docker.io";
575 my ($docker_locator, $docker_stream, $docker_hash);
576 if ($docker_locator = $Job->{docker_image_locator}) {
577   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
578   if (!$docker_hash)
579   {
580     croak("No Docker image hash found from locator $docker_locator");
581   }
582   $docker_stream =~ s/^\.//;
583   my $docker_install_script = qq{
584 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
585     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
586 fi
587 };
588   my $docker_pid = fork();
589   if ($docker_pid == 0)
590   {
591     srun (["srun", "--nodelist=" . join(',', @node)],
592           ["/bin/sh", "-ec", $docker_install_script]);
593     exit ($?);
594   }
595   while (1)
596   {
597     last if $docker_pid == waitpid (-1, WNOHANG);
598     freeze_if_want_freeze ($docker_pid);
599     select (undef, undef, undef, 0.1);
600   }
601   if ($? != 0)
602   {
603     croak("Installing Docker image from $docker_locator exited "
604           .exit_status_s($?));
605   }
606 }
607
608 foreach (qw (script script_version script_parameters runtime_constraints))
609 {
610   Log (undef,
611        "$_ " .
612        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
613 }
614 foreach (split (/\n/, $Job->{knobs}))
615 {
616   Log (undef, "knob " . $_);
617 }
618
619
620
621 $main::success = undef;
622
623
624
625 ONELEVEL:
626
627 my $thisround_succeeded = 0;
628 my $thisround_failed = 0;
629 my $thisround_failed_multiple = 0;
630
631 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
632                        or $a <=> $b } @jobstep_todo;
633 my $level = $jobstep[$jobstep_todo[0]]->{level};
634 Log (undef, "start level $level");
635
636
637
638 my %proc;
639 my @freeslot = (0..$#slot);
640 my @holdslot;
641 my %reader;
642 my $progress_is_dirty = 1;
643 my $progress_stats_updated = 0;
644
645 update_progress_stats();
646
647
648
649 THISROUND:
650 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
651 {
652   my $id = $jobstep_todo[$todo_ptr];
653   my $Jobstep = $jobstep[$id];
654   if ($Jobstep->{level} != $level)
655   {
656     next;
657   }
658
659   pipe $reader{$id}, "writer" or croak ($!);
660   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
661   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
662
663   my $childslot = $freeslot[0];
664   my $childnode = $slot[$childslot]->{node};
665   my $childslotname = join (".",
666                             $slot[$childslot]->{node}->{name},
667                             $slot[$childslot]->{cpu});
668   my $childpid = fork();
669   if ($childpid == 0)
670   {
671     $SIG{'INT'} = 'DEFAULT';
672     $SIG{'QUIT'} = 'DEFAULT';
673     $SIG{'TERM'} = 'DEFAULT';
674
675     foreach (values (%reader))
676     {
677       close($_);
678     }
679     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
680     open(STDOUT,">&writer");
681     open(STDERR,">&writer");
682
683     undef $dbh;
684     undef $sth;
685
686     delete $ENV{"GNUPGHOME"};
687     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
688     $ENV{"TASK_QSEQUENCE"} = $id;
689     $ENV{"TASK_SEQUENCE"} = $level;
690     $ENV{"JOB_SCRIPT"} = $Job->{script};
691     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
692       $param =~ tr/a-z/A-Z/;
693       $ENV{"JOB_PARAMETER_$param"} = $value;
694     }
695     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
696     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
697     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
698     $ENV{"HOME"} = $ENV{"TASK_WORK"};
699     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
700     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
701     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
702     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
703
704     $ENV{"GZIP"} = "-n";
705
706     my @srunargs = (
707       "srun",
708       "--nodelist=".$childnode->{name},
709       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
710       "--job-name=$job_id.$id.$$",
711         );
712     my $build_script_to_send = "";
713     my $command =
714         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
715         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
716         ."&& cd $ENV{CRUNCH_TMP} ";
717     if ($build_script)
718     {
719       $build_script_to_send = $build_script;
720       $command .=
721           "&& perl -";
722     }
723     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
724     if ($docker_hash)
725     {
726       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
727       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
728       # Dynamically configure the container to use the host system as its
729       # DNS server.  Get the host's global addresses from the ip command,
730       # and turn them into docker --dns options using gawk.
731       $command .=
732           q{$(ip -o address show scope global |
733               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
734       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
735       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
736       $command .= "--env=\QHOME=/home/crunch\E ";
737       while (my ($env_key, $env_val) = each %ENV)
738       {
739         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
740           if ($env_key eq "TASK_WORK") {
741             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
742           }
743           elsif ($env_key eq "TASK_KEEPMOUNT") {
744             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
745           }
746           else {
747             $command .= "--env=\Q$env_key=$env_val\E ";
748           }
749         }
750       }
751       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
752       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
753       $command .= "\Q$docker_hash\E ";
754       $command .= "stdbuf --output=0 --error=0 ";
755       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
756     } else {
757       # Non-docker run
758       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
759       $command .= "stdbuf --output=0 --error=0 ";
760       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
761     }
762
763     my @execargs = ('bash', '-c', $command);
764     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
765     # exec() failed, we assume nothing happened.
766     Log(undef, "srun() failed on build script");
767     die;
768   }
769   close("writer");
770   if (!defined $childpid)
771   {
772     close $reader{$id};
773     delete $reader{$id};
774     next;
775   }
776   shift @freeslot;
777   $proc{$childpid} = { jobstep => $id,
778                        time => time,
779                        slot => $childslot,
780                        jobstepname => "$job_id.$id.$childpid",
781                      };
782   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
783   $slot[$childslot]->{pid} = $childpid;
784
785   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
786   Log ($id, "child $childpid started on $childslotname");
787   $Jobstep->{starttime} = time;
788   $Jobstep->{node} = $childnode->{name};
789   $Jobstep->{slotindex} = $childslot;
790   delete $Jobstep->{stderr};
791   delete $Jobstep->{finishtime};
792
793   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
794   $Jobstep->{'arvados_task'}->save;
795
796   splice @jobstep_todo, $todo_ptr, 1;
797   --$todo_ptr;
798
799   $progress_is_dirty = 1;
800
801   while (!@freeslot
802          ||
803          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
804   {
805     last THISROUND if $main::please_freeze;
806     if ($main::please_info)
807     {
808       $main::please_info = 0;
809       freeze();
810       collate_output();
811       save_meta(1);
812       update_progress_stats();
813     }
814     my $gotsome
815         = readfrompipes ()
816         + reapchildren ();
817     if (!$gotsome)
818     {
819       check_refresh_wanted();
820       check_squeue();
821       update_progress_stats();
822       select (undef, undef, undef, 0.1);
823     }
824     elsif (time - $progress_stats_updated >= 30)
825     {
826       update_progress_stats();
827     }
828     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
829         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
830     {
831       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
832           .($thisround_failed+$thisround_succeeded)
833           .") -- giving up on this round";
834       Log (undef, $message);
835       last THISROUND;
836     }
837
838     # move slots from freeslot to holdslot (or back to freeslot) if necessary
839     for (my $i=$#freeslot; $i>=0; $i--) {
840       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
841         push @holdslot, (splice @freeslot, $i, 1);
842       }
843     }
844     for (my $i=$#holdslot; $i>=0; $i--) {
845       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
846         push @freeslot, (splice @holdslot, $i, 1);
847       }
848     }
849
850     # give up if no nodes are succeeding
851     if (!grep { $_->{node}->{losing_streak} == 0 &&
852                     $_->{node}->{hold_count} < 4 } @slot) {
853       my $message = "Every node has failed -- giving up on this round";
854       Log (undef, $message);
855       last THISROUND;
856     }
857   }
858 }
859
860
861 push @freeslot, splice @holdslot;
862 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
863
864
865 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
866 while (%proc)
867 {
868   if ($main::please_continue) {
869     $main::please_continue = 0;
870     goto THISROUND;
871   }
872   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
873   readfrompipes ();
874   if (!reapchildren())
875   {
876     check_refresh_wanted();
877     check_squeue();
878     update_progress_stats();
879     select (undef, undef, undef, 0.1);
880     killem (keys %proc) if $main::please_freeze;
881   }
882 }
883
884 update_progress_stats();
885 freeze_if_want_freeze();
886
887
888 if (!defined $main::success)
889 {
890   if (@jobstep_todo &&
891       $thisround_succeeded == 0 &&
892       ($thisround_failed == 0 || $thisround_failed > 4))
893   {
894     my $message = "stop because $thisround_failed tasks failed and none succeeded";
895     Log (undef, $message);
896     $main::success = 0;
897   }
898   if (!@jobstep_todo)
899   {
900     $main::success = 1;
901   }
902 }
903
904 goto ONELEVEL if !defined $main::success;
905
906
907 release_allocation();
908 freeze();
909 my $collated_output = &collate_output();
910
911 if (!$collated_output) {
912   Log(undef, "output undef");
913 }
914 else {
915   eval {
916     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
917         or die "failed to get collated manifest: $!";
918     my $orig_manifest_text = '';
919     while (my $manifest_line = <$orig_manifest>) {
920       $orig_manifest_text .= $manifest_line;
921     }
922     my $output = retry_op(sub {
923       $arv->{'collections'}->{'create'}->execute(
924         'collection' => {'manifest_text' => $orig_manifest_text});
925     });
926     Log(undef, "output uuid " . $output->{uuid});
927     Log(undef, "output hash " . $output->{portable_data_hash});
928     $Job->update_attributes('output' => $output->{portable_data_hash});
929   };
930   if ($@) {
931     Log (undef, "Failed to register output manifest: $@");
932   }
933 }
934
935 Log (undef, "finish");
936
937 save_meta();
938
939 my $final_state;
940 if ($collated_output && $main::success) {
941   $final_state = 'Complete';
942 } else {
943   $final_state = 'Failed';
944 }
945 $Job->update_attributes('state' => $final_state);
946
947 exit (($final_state eq 'Complete') ? 0 : 1);
948
949
950
951 sub update_progress_stats
952 {
953   $progress_stats_updated = time;
954   return if !$progress_is_dirty;
955   my ($todo, $done, $running) = (scalar @jobstep_todo,
956                                  scalar @jobstep_done,
957                                  scalar @slot - scalar @freeslot - scalar @holdslot);
958   $Job->{'tasks_summary'} ||= {};
959   $Job->{'tasks_summary'}->{'todo'} = $todo;
960   $Job->{'tasks_summary'}->{'done'} = $done;
961   $Job->{'tasks_summary'}->{'running'} = $running;
962   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
963   Log (undef, "status: $done done, $running running, $todo todo");
964   $progress_is_dirty = 0;
965 }
966
967
968
969 sub reapchildren
970 {
971   my $pid = waitpid (-1, WNOHANG);
972   return 0 if $pid <= 0;
973
974   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
975                   . "."
976                   . $slot[$proc{$pid}->{slot}]->{cpu});
977   my $jobstepid = $proc{$pid}->{jobstep};
978   my $elapsed = time - $proc{$pid}->{time};
979   my $Jobstep = $jobstep[$jobstepid];
980
981   my $childstatus = $?;
982   my $exitvalue = $childstatus >> 8;
983   my $exitinfo = "exit ".exit_status_s($childstatus);
984   $Jobstep->{'arvados_task'}->reload;
985   my $task_success = $Jobstep->{'arvados_task'}->{success};
986
987   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
988
989   if (!defined $task_success) {
990     # task did not indicate one way or the other --> fail
991     $Jobstep->{'arvados_task'}->{success} = 0;
992     $Jobstep->{'arvados_task'}->save;
993     $task_success = 0;
994   }
995
996   if (!$task_success)
997   {
998     my $temporary_fail;
999     $temporary_fail ||= $Jobstep->{node_fail};
1000     $temporary_fail ||= ($exitvalue == 111);
1001
1002     ++$thisround_failed;
1003     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1004
1005     # Check for signs of a failed or misconfigured node
1006     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1007         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1008       # Don't count this against jobstep failure thresholds if this
1009       # node is already suspected faulty and srun exited quickly
1010       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1011           $elapsed < 5) {
1012         Log ($jobstepid, "blaming failure on suspect node " .
1013              $slot[$proc{$pid}->{slot}]->{node}->{name});
1014         $temporary_fail ||= 1;
1015       }
1016       ban_node_by_slot($proc{$pid}->{slot});
1017     }
1018
1019     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1020                              ++$Jobstep->{'failures'},
1021                              $temporary_fail ? 'temporary ' : 'permanent',
1022                              $elapsed));
1023
1024     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1025       # Give up on this task, and the whole job
1026       $main::success = 0;
1027       $main::please_freeze = 1;
1028     }
1029     # Put this task back on the todo queue
1030     push @jobstep_todo, $jobstepid;
1031     $Job->{'tasks_summary'}->{'failed'}++;
1032   }
1033   else
1034   {
1035     ++$thisround_succeeded;
1036     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1037     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1038     push @jobstep_done, $jobstepid;
1039     Log ($jobstepid, "success in $elapsed seconds");
1040   }
1041   $Jobstep->{exitcode} = $childstatus;
1042   $Jobstep->{finishtime} = time;
1043   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1044   $Jobstep->{'arvados_task'}->save;
1045   process_stderr ($jobstepid, $task_success);
1046   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1047
1048   close $reader{$jobstepid};
1049   delete $reader{$jobstepid};
1050   delete $slot[$proc{$pid}->{slot}]->{pid};
1051   push @freeslot, $proc{$pid}->{slot};
1052   delete $proc{$pid};
1053
1054   if ($task_success) {
1055     # Load new tasks
1056     my $newtask_list = [];
1057     my $newtask_results;
1058     do {
1059       $newtask_results = retry_op(sub {
1060         $arv->{'job_tasks'}->{'list'}->execute(
1061           'where' => {
1062             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1063           },
1064           'order' => 'qsequence',
1065           'offset' => scalar(@$newtask_list),
1066         );
1067       });
1068       push(@$newtask_list, @{$newtask_results->{items}});
1069     } while (@{$newtask_results->{items}});
1070     foreach my $arvados_task (@$newtask_list) {
1071       my $jobstep = {
1072         'level' => $arvados_task->{'sequence'},
1073         'failures' => 0,
1074         'arvados_task' => $arvados_task
1075       };
1076       push @jobstep, $jobstep;
1077       push @jobstep_todo, $#jobstep;
1078     }
1079   }
1080
1081   $progress_is_dirty = 1;
1082   1;
1083 }
1084
1085 sub check_refresh_wanted
1086 {
1087   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1088   if (@stat && $stat[9] > $latest_refresh) {
1089     $latest_refresh = scalar time;
1090     my $Job2 = retry_op(sub {
1091       $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1092     });
1093     for my $attr ('cancelled_at',
1094                   'cancelled_by_user_uuid',
1095                   'cancelled_by_client_uuid',
1096                   'state') {
1097       $Job->{$attr} = $Job2->{$attr};
1098     }
1099     if ($Job->{'state'} ne "Running") {
1100       if ($Job->{'state'} eq "Cancelled") {
1101         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1102       } else {
1103         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1104       }
1105       $main::success = 0;
1106       $main::please_freeze = 1;
1107     }
1108   }
1109 }
1110
1111 sub check_squeue
1112 {
1113   # return if the kill list was checked <4 seconds ago
1114   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1115   {
1116     return;
1117   }
1118   $squeue_kill_checked = time;
1119
1120   # use killem() on procs whose killtime is reached
1121   for (keys %proc)
1122   {
1123     if (exists $proc{$_}->{killtime}
1124         && $proc{$_}->{killtime} <= time)
1125     {
1126       killem ($_);
1127     }
1128   }
1129
1130   # return if the squeue was checked <60 seconds ago
1131   if (defined $squeue_checked && $squeue_checked > time - 60)
1132   {
1133     return;
1134   }
1135   $squeue_checked = time;
1136
1137   if (!$have_slurm)
1138   {
1139     # here is an opportunity to check for mysterious problems with local procs
1140     return;
1141   }
1142
1143   # get a list of steps still running
1144   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1145   chop @squeue;
1146   if ($squeue[-1] ne "ok")
1147   {
1148     return;
1149   }
1150   pop @squeue;
1151
1152   # which of my jobsteps are running, according to squeue?
1153   my %ok;
1154   foreach (@squeue)
1155   {
1156     if (/^(\d+)\.(\d+) (\S+)/)
1157     {
1158       if ($1 eq $ENV{SLURM_JOBID})
1159       {
1160         $ok{$3} = 1;
1161       }
1162     }
1163   }
1164
1165   # which of my active child procs (>60s old) were not mentioned by squeue?
1166   foreach (keys %proc)
1167   {
1168     if ($proc{$_}->{time} < time - 60
1169         && !exists $ok{$proc{$_}->{jobstepname}}
1170         && !exists $proc{$_}->{killtime})
1171     {
1172       # kill this proc if it hasn't exited in 30 seconds
1173       $proc{$_}->{killtime} = time + 30;
1174     }
1175   }
1176 }
1177
1178
1179 sub release_allocation
1180 {
1181   if ($have_slurm)
1182   {
1183     Log (undef, "release job allocation");
1184     system "scancel $ENV{SLURM_JOBID}";
1185   }
1186 }
1187
1188
1189 sub readfrompipes
1190 {
1191   my $gotsome = 0;
1192   foreach my $job (keys %reader)
1193   {
1194     my $buf;
1195     while (0 < sysread ($reader{$job}, $buf, 8192))
1196     {
1197       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1198       $jobstep[$job]->{stderr} .= $buf;
1199       preprocess_stderr ($job);
1200       if (length ($jobstep[$job]->{stderr}) > 16384)
1201       {
1202         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1203       }
1204       $gotsome = 1;
1205     }
1206   }
1207   return $gotsome;
1208 }
1209
1210
1211 sub preprocess_stderr
1212 {
1213   my $job = shift;
1214
1215   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1216     my $line = $1;
1217     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1218     Log ($job, "stderr $line");
1219     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1220       # whoa.
1221       $main::please_freeze = 1;
1222     }
1223     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1224       $jobstep[$job]->{node_fail} = 1;
1225       ban_node_by_slot($jobstep[$job]->{slotindex});
1226     }
1227   }
1228 }
1229
1230
1231 sub process_stderr
1232 {
1233   my $job = shift;
1234   my $task_success = shift;
1235   preprocess_stderr ($job);
1236
1237   map {
1238     Log ($job, "stderr $_");
1239   } split ("\n", $jobstep[$job]->{stderr});
1240 }
1241
1242 sub fetch_block
1243 {
1244   my $hash = shift;
1245   my ($keep, $child_out, $output_block);
1246
1247   my $cmd = "arv-get \Q$hash\E";
1248   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1249   $output_block = '';
1250   while (1) {
1251     my $buf;
1252     my $bytes = sysread($keep, $buf, 1024 * 1024);
1253     if (!defined $bytes) {
1254       die "reading from arv-get: $!";
1255     } elsif ($bytes == 0) {
1256       # sysread returns 0 at the end of the pipe.
1257       last;
1258     } else {
1259       # some bytes were read into buf.
1260       $output_block .= $buf;
1261     }
1262   }
1263   close $keep;
1264   return $output_block;
1265 }
1266
1267 sub collate_output
1268 {
1269   Log (undef, "collate");
1270
1271   my ($child_out, $child_in);
1272   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1273                   '--retries', retry_count());
1274   my $joboutput;
1275   for (@jobstep)
1276   {
1277     next if (!exists $_->{'arvados_task'}->{'output'} ||
1278              !$_->{'arvados_task'}->{'success'});
1279     my $output = $_->{'arvados_task'}->{output};
1280     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1281     {
1282       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1283       print $child_in $output;
1284     }
1285     elsif (@jobstep == 1)
1286     {
1287       $joboutput = $output;
1288       last;
1289     }
1290     elsif (defined (my $outblock = fetch_block ($output)))
1291     {
1292       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1293       print $child_in $outblock;
1294     }
1295     else
1296     {
1297       Log (undef, "XXX fetch_block($output) failed XXX");
1298       $main::success = 0;
1299     }
1300   }
1301   $child_in->close;
1302
1303   if (!defined $joboutput) {
1304     my $s = IO::Select->new($child_out);
1305     if ($s->can_read(120)) {
1306       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1307       chomp($joboutput);
1308       # TODO: Ensure exit status == 0.
1309     } else {
1310       Log (undef, "timed out reading from 'arv-put'");
1311     }
1312   }
1313   # TODO: kill $pid instead of waiting, now that we've decided to
1314   # ignore further output.
1315   waitpid($pid, 0);
1316
1317   return $joboutput;
1318 }
1319
1320
1321 sub killem
1322 {
1323   foreach (@_)
1324   {
1325     my $sig = 2;                # SIGINT first
1326     if (exists $proc{$_}->{"sent_$sig"} &&
1327         time - $proc{$_}->{"sent_$sig"} > 4)
1328     {
1329       $sig = 15;                # SIGTERM if SIGINT doesn't work
1330     }
1331     if (exists $proc{$_}->{"sent_$sig"} &&
1332         time - $proc{$_}->{"sent_$sig"} > 4)
1333     {
1334       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1335     }
1336     if (!exists $proc{$_}->{"sent_$sig"})
1337     {
1338       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1339       kill $sig, $_;
1340       select (undef, undef, undef, 0.1);
1341       if ($sig == 2)
1342       {
1343         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1344       }
1345       $proc{$_}->{"sent_$sig"} = time;
1346       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1347     }
1348   }
1349 }
1350
1351
1352 sub fhbits
1353 {
1354   my($bits);
1355   for (@_) {
1356     vec($bits,fileno($_),1) = 1;
1357   }
1358   $bits;
1359 }
1360
1361
1362 # Send log output to Keep via arv-put.
1363 #
1364 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1365 # $log_pipe_pid is the pid of the arv-put subprocess.
1366 #
1367 # The only functions that should access these variables directly are:
1368 #
1369 # log_writer_start($logfilename)
1370 #     Starts an arv-put pipe, reading data on stdin and writing it to
1371 #     a $logfilename file in an output collection.
1372 #
1373 # log_writer_send($txt)
1374 #     Writes $txt to the output log collection.
1375 #
1376 # log_writer_finish()
1377 #     Closes the arv-put pipe and returns the output that it produces.
1378 #
1379 # log_writer_is_active()
1380 #     Returns a true value if there is currently a live arv-put
1381 #     process, false otherwise.
1382 #
1383 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1384
1385 sub log_writer_start($)
1386 {
1387   my $logfilename = shift;
1388   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1389                         'arv-put', '--portable-data-hash',
1390                         '--retries', '3',
1391                         '--filename', $logfilename,
1392                         '-');
1393 }
1394
1395 sub log_writer_send($)
1396 {
1397   my $txt = shift;
1398   print $log_pipe_in $txt;
1399 }
1400
1401 sub log_writer_finish()
1402 {
1403   return unless $log_pipe_pid;
1404
1405   close($log_pipe_in);
1406   my $arv_put_output;
1407
1408   my $s = IO::Select->new($log_pipe_out);
1409   if ($s->can_read(120)) {
1410     sysread($log_pipe_out, $arv_put_output, 1024);
1411     chomp($arv_put_output);
1412   } else {
1413     Log (undef, "timed out reading from 'arv-put'");
1414   }
1415
1416   waitpid($log_pipe_pid, 0);
1417   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1418   if ($?) {
1419     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1420   }
1421
1422   return $arv_put_output;
1423 }
1424
1425 sub log_writer_is_active() {
1426   return $log_pipe_pid;
1427 }
1428
1429 sub Log                         # ($jobstep_id, $logmessage)
1430 {
1431   if ($_[1] =~ /\n/) {
1432     for my $line (split (/\n/, $_[1])) {
1433       Log ($_[0], $line);
1434     }
1435     return;
1436   }
1437   my $fh = select STDERR; $|=1; select $fh;
1438   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1439   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1440   $message .= "\n";
1441   my $datetime;
1442   if (log_writer_is_active() || -t STDERR) {
1443     my @gmtime = gmtime;
1444     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1445                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1446   }
1447   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1448
1449   if (log_writer_is_active()) {
1450     log_writer_send($datetime . " " . $message);
1451   }
1452 }
1453
1454
1455 sub croak
1456 {
1457   my ($package, $file, $line) = caller;
1458   my $message = "@_ at $file line $line\n";
1459   Log (undef, $message);
1460   freeze() if @jobstep_todo;
1461   collate_output() if @jobstep_todo;
1462   cleanup();
1463   save_meta();
1464   die;
1465 }
1466
1467
1468 sub cleanup
1469 {
1470   return unless $Job;
1471   if ($Job->{'state'} eq 'Cancelled') {
1472     $Job->update_attributes('finished_at' => scalar gmtime);
1473   } else {
1474     $Job->update_attributes('state' => 'Failed');
1475   }
1476 }
1477
1478
1479 sub save_meta
1480 {
1481   my $justcheckpoint = shift; # false if this will be the last meta saved
1482   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1483   return unless log_writer_is_active();
1484
1485   my $loglocator = log_writer_finish();
1486   Log (undef, "log manifest is $loglocator");
1487   $Job->{'log'} = $loglocator;
1488   $Job->update_attributes('log', $loglocator);
1489 }
1490
1491
1492 sub freeze_if_want_freeze
1493 {
1494   if ($main::please_freeze)
1495   {
1496     release_allocation();
1497     if (@_)
1498     {
1499       # kill some srun procs before freeze+stop
1500       map { $proc{$_} = {} } @_;
1501       while (%proc)
1502       {
1503         killem (keys %proc);
1504         select (undef, undef, undef, 0.1);
1505         my $died;
1506         while (($died = waitpid (-1, WNOHANG)) > 0)
1507         {
1508           delete $proc{$died};
1509         }
1510       }
1511     }
1512     freeze();
1513     collate_output();
1514     cleanup();
1515     save_meta();
1516     exit 1;
1517   }
1518 }
1519
1520
1521 sub freeze
1522 {
1523   Log (undef, "Freeze not implemented");
1524   return;
1525 }
1526
1527
1528 sub thaw
1529 {
1530   croak ("Thaw not implemented");
1531 }
1532
1533
1534 sub freezequote
1535 {
1536   my $s = shift;
1537   $s =~ s/\\/\\\\/g;
1538   $s =~ s/\n/\\n/g;
1539   return $s;
1540 }
1541
1542
1543 sub freezeunquote
1544 {
1545   my $s = shift;
1546   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1547   return $s;
1548 }
1549
1550
1551 sub srun
1552 {
1553   my $srunargs = shift;
1554   my $execargs = shift;
1555   my $opts = shift || {};
1556   my $stdin = shift;
1557   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1558   print STDERR (join (" ",
1559                       map { / / ? "'$_'" : $_ }
1560                       (@$args)),
1561                 "\n")
1562       if $ENV{CRUNCH_DEBUG};
1563
1564   if (defined $stdin) {
1565     my $child = open STDIN, "-|";
1566     defined $child or die "no fork: $!";
1567     if ($child == 0) {
1568       print $stdin or die $!;
1569       close STDOUT or die $!;
1570       exit 0;
1571     }
1572   }
1573
1574   return system (@$args) if $opts->{fork};
1575
1576   exec @$args;
1577   warn "ENV size is ".length(join(" ",%ENV));
1578   die "exec failed: $!: @$args";
1579 }
1580
1581
1582 sub ban_node_by_slot {
1583   # Don't start any new jobsteps on this node for 60 seconds
1584   my $slotid = shift;
1585   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1586   $slot[$slotid]->{node}->{hold_count}++;
1587   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1588 }
1589
1590 sub must_lock_now
1591 {
1592   my ($lockfile, $error_message) = @_;
1593   open L, ">", $lockfile or croak("$lockfile: $!");
1594   if (!flock L, LOCK_EX|LOCK_NB) {
1595     croak("Can't lock $lockfile: $error_message\n");
1596   }
1597 }
1598
1599 sub find_docker_image {
1600   # Given a Keep locator, check to see if it contains a Docker image.
1601   # If so, return its stream name and Docker hash.
1602   # If not, return undef for both values.
1603   my $locator = shift;
1604   my ($streamname, $filename);
1605   my $image = retry_op(sub {
1606     $arv->{collections}->{get}->execute(uuid => $locator);
1607   });
1608   if ($image) {
1609     foreach my $line (split(/\n/, $image->{manifest_text})) {
1610       my @tokens = split(/\s+/, $line);
1611       next if (!@tokens);
1612       $streamname = shift(@tokens);
1613       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1614         if (defined($filename)) {
1615           return (undef, undef);  # More than one file in the Collection.
1616         } else {
1617           $filename = (split(/:/, $filedata, 3))[2];
1618         }
1619       }
1620     }
1621   }
1622   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1623     return ($streamname, $1);
1624   } else {
1625     return (undef, undef);
1626   }
1627 }
1628
1629 sub retry_count {
1630   # Calculate the number of times an operation should be retried,
1631   # assuming exponential backoff, and that we're willing to retry as
1632   # long as tasks have been running.  Enforce a minimum of 3 retries.
1633   my ($starttime, $endtime, $timediff, $retries);
1634   if (@jobstep) {
1635     $starttime = $jobstep[0]->{starttime};
1636     $endtime = $jobstep[-1]->{finishtime};
1637   }
1638   if (!defined($starttime)) {
1639     $timediff = 0;
1640   } elsif (!defined($endtime)) {
1641     $timediff = time - $starttime;
1642   } else {
1643     $timediff = ($endtime - $starttime) - (time - $endtime);
1644   }
1645   if ($timediff > 0) {
1646     $retries = int(log($timediff) / log(2));
1647   } else {
1648     $retries = 1;  # Use the minimum.
1649   }
1650   return ($retries > 3) ? $retries : 3;
1651 }
1652
1653 sub retry_op {
1654   # Given a function reference, call it with the remaining arguments.  If
1655   # it dies, retry it with exponential backoff until it succeeds, or until
1656   # the current retry_count is exhausted.
1657   my $operation = shift;
1658   my $retries = retry_count();
1659   foreach my $try_count (0..$retries) {
1660     my $next_try = time + (2 ** $try_count);
1661     my $result = eval { $operation->(@_); };
1662     if (!$@) {
1663       return $result;
1664     } elsif ($try_count < $retries) {
1665       my $sleep_time = $next_try - time;
1666       sleep($sleep_time) if ($sleep_time > 0);
1667     }
1668   }
1669   # Ensure the error message ends in a newline, so Perl doesn't add
1670   # retry_op's line number to it.
1671   chomp($@);
1672   die($@ . "\n");
1673 }
1674
1675 sub exit_status_s {
1676   # Given a $?, return a human-readable exit code string like "0" or
1677   # "1" or "0 with signal 1" or "1 with signal 11".
1678   my $exitcode = shift;
1679   my $s = $exitcode >> 8;
1680   if ($exitcode & 0x7f) {
1681     $s .= " with signal " . ($exitcode & 0x7f);
1682   }
1683   if ($exitcode & 0x80) {
1684     $s .= " with core dump";
1685   }
1686   return $s;
1687 }
1688
1689 __DATA__
1690 #!/usr/bin/perl
1691
1692 # checkout-and-build
1693
1694 use Fcntl ':flock';
1695 use File::Path qw( make_path );
1696
1697 my $destdir = $ENV{"CRUNCH_SRC"};
1698 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1699 my $repo = $ENV{"CRUNCH_SRC_URL"};
1700 my $task_work = $ENV{"TASK_WORK"};
1701
1702 for my $dir ($destdir, $task_work) {
1703     if ($dir) {
1704         make_path $dir;
1705         -e $dir or die "Failed to create temporary directory ($dir): $!";
1706     }
1707 }
1708
1709 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1710 flock L, LOCK_EX;
1711 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1712     if (@ARGV) {
1713         exec(@ARGV);
1714         die "Cannot exec `@ARGV`: $!";
1715     } else {
1716         exit 0;
1717     }
1718 }
1719
1720 unlink "$destdir.commit";
1721 open STDOUT, ">", "$destdir.log";
1722 open STDERR, ">&STDOUT";
1723
1724 mkdir $destdir;
1725 my @git_archive_data = <DATA>;
1726 if (@git_archive_data) {
1727   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1728   print TARX @git_archive_data;
1729   if(!close(TARX)) {
1730     die "'tar -C $destdir -xf -' exited $?: $!";
1731   }
1732 }
1733
1734 my $pwd;
1735 chomp ($pwd = `pwd`);
1736 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1737 mkdir $install_dir;
1738
1739 for my $src_path ("$destdir/arvados/sdk/python") {
1740   if (-d $src_path) {
1741     shell_or_die ("virtualenv", $install_dir);
1742     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1743   }
1744 }
1745
1746 if (-e "$destdir/crunch_scripts/install") {
1747     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1748 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1749     # Old version
1750     shell_or_die ("./tests/autotests.sh", $install_dir);
1751 } elsif (-e "./install.sh") {
1752     shell_or_die ("./install.sh", $install_dir);
1753 }
1754
1755 if ($commit) {
1756     unlink "$destdir.commit.new";
1757     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1758     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1759 }
1760
1761 close L;
1762
1763 if (@ARGV) {
1764     exec(@ARGV);
1765     die "Cannot exec `@ARGV`: $!";
1766 } else {
1767     exit 0;
1768 }
1769
1770 sub shell_or_die
1771 {
1772   if ($ENV{"DEBUG"}) {
1773     print STDERR "@_\n";
1774   }
1775   system (@_) == 0
1776       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1777 }
1778
1779 __DATA__