Merge remote-tracking branch 'origin/master' into 4031-fix-graph-connections
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Data::Dumper;
90 use Digest::MD5 qw(md5_hex);
91 use Getopt::Long;
92 use IPC::Open2;
93 use IO::Select;
94 use File::Temp;
95 use Fcntl ':flock';
96 use File::Path qw( make_path remove_tree );
97
98 use constant EX_TEMPFAIL => 75;
99
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103   if ($ENV{"USER"} ne "crunch" && $< != 0) {
104     # use a tmp dir unique for my uid
105     $ENV{"CRUNCH_TMP"} .= "-$<";
106   }
107 }
108
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
112 }
113
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
118
119 my $force_unlock;
120 my $git_dir;
121 my $jobspec;
122 my $job_api_token;
123 my $no_clear_tmp;
124 my $resume_stash;
125 GetOptions('force-unlock' => \$force_unlock,
126            'git-dir=s' => \$git_dir,
127            'job=s' => \$jobspec,
128            'job-api-token=s' => \$job_api_token,
129            'no-clear-tmp' => \$no_clear_tmp,
130            'resume-stash=s' => \$resume_stash,
131     );
132
133 if (defined $job_api_token) {
134   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
135 }
136
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
138 my $local_job = 0;
139
140
141 $SIG{'USR1'} = sub
142 {
143   $main::ENV{CRUNCH_DEBUG} = 1;
144 };
145 $SIG{'USR2'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 0;
148 };
149
150
151
152 my $arv = Arvados->new('apiVersion' => 'v1');
153
154 my $Job;
155 my $job_id;
156 my $dbh;
157 my $sth;
158 my @jobstep;
159
160 my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
161
162 if ($jobspec =~ /^[-a-z\d]+$/)
163 {
164   # $jobspec is an Arvados UUID, not a JSON job specification
165   $Job = retry_op(sub {
166     $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
167   });
168   if (!$force_unlock) {
169     # Claim this job, and make sure nobody else does
170     eval { retry_op(sub {
171       # lock() sets is_locked_by_uuid and changes state to Running.
172       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
173     }); };
174     if ($@) {
175       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
176       exit EX_TEMPFAIL;
177     };
178   }
179 }
180 else
181 {
182   $Job = JSON::decode_json($jobspec);
183
184   if (!$resume_stash)
185   {
186     map { croak ("No $_ specified") unless $Job->{$_} }
187     qw(script script_version script_parameters);
188   }
189
190   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
191   $Job->{'started_at'} = gmtime;
192   $Job->{'state'} = 'Running';
193
194   $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
195 }
196 $job_id = $Job->{'uuid'};
197
198 my $keep_logfile = $job_id . '.log.txt';
199 log_writer_start($keep_logfile);
200
201 $Job->{'runtime_constraints'} ||= {};
202 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
203 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
204
205
206 Log (undef, "check slurm allocation");
207 my @slot;
208 my @node;
209 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
210 my @sinfo;
211 if (!$have_slurm)
212 {
213   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
214   push @sinfo, "$localcpus localhost";
215 }
216 if (exists $ENV{SLURM_NODELIST})
217 {
218   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
219 }
220 foreach (@sinfo)
221 {
222   my ($ncpus, $slurm_nodelist) = split;
223   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
224
225   my @nodelist;
226   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
227   {
228     my $nodelist = $1;
229     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
230     {
231       my $ranges = $1;
232       foreach (split (",", $ranges))
233       {
234         my ($a, $b);
235         if (/(\d+)-(\d+)/)
236         {
237           $a = $1;
238           $b = $2;
239         }
240         else
241         {
242           $a = $_;
243           $b = $_;
244         }
245         push @nodelist, map {
246           my $n = $nodelist;
247           $n =~ s/\[[-,\d]+\]/$_/;
248           $n;
249         } ($a..$b);
250       }
251     }
252     else
253     {
254       push @nodelist, $nodelist;
255     }
256   }
257   foreach my $nodename (@nodelist)
258   {
259     Log (undef, "node $nodename - $ncpus slots");
260     my $node = { name => $nodename,
261                  ncpus => $ncpus,
262                  losing_streak => 0,
263                  hold_until => 0 };
264     foreach my $cpu (1..$ncpus)
265     {
266       push @slot, { node => $node,
267                     cpu => $cpu };
268     }
269   }
270   push @node, @nodelist;
271 }
272
273
274
275 # Ensure that we get one jobstep running on each allocated node before
276 # we start overloading nodes with concurrent steps
277
278 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
279
280
281 $Job->update_attributes(
282   'tasks_summary' => { 'failed' => 0,
283                        'todo' => 1,
284                        'running' => 0,
285                        'done' => 0 });
286
287 Log (undef, "start");
288 $SIG{'INT'} = sub { $main::please_freeze = 1; };
289 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
290 $SIG{'TERM'} = \&croak;
291 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
292 $SIG{'ALRM'} = sub { $main::please_info = 1; };
293 $SIG{'CONT'} = sub { $main::please_continue = 1; };
294 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
295
296 $main::please_freeze = 0;
297 $main::please_info = 0;
298 $main::please_continue = 0;
299 $main::please_refresh = 0;
300 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
301
302 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
303 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
304 $ENV{"JOB_UUID"} = $job_id;
305
306
307 my @jobstep_todo = ();
308 my @jobstep_done = ();
309 my @jobstep_tomerge = ();
310 my $jobstep_tomerge_level = 0;
311 my $squeue_checked;
312 my $squeue_kill_checked;
313 my $output_in_keep = 0;
314 my $latest_refresh = scalar time;
315
316
317
318 if (defined $Job->{thawedfromkey})
319 {
320   thaw ($Job->{thawedfromkey});
321 }
322 else
323 {
324   my $first_task = retry_op(sub {
325     $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
326       'job_uuid' => $Job->{'uuid'},
327       'sequence' => 0,
328       'qsequence' => 0,
329       'parameters' => {},
330     });
331   });
332   push @jobstep, { 'level' => 0,
333                    'failures' => 0,
334                    'arvados_task' => $first_task,
335                  };
336   push @jobstep_todo, 0;
337 }
338
339
340 if (!$have_slurm)
341 {
342   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
343 }
344
345
346 my $build_script;
347 do {
348   local $/ = undef;
349   $build_script = <DATA>;
350 };
351 my $nodelist = join(",", @node);
352
353 if (!defined $no_clear_tmp) {
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355   Log (undef, "Clean work dirs");
356
357   my $cleanpid = fork();
358   if ($cleanpid == 0)
359   {
360     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
361           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
362     exit (1);
363   }
364   while (1)
365   {
366     last if $cleanpid == waitpid (-1, WNOHANG);
367     freeze_if_want_freeze ($cleanpid);
368     select (undef, undef, undef, 0.1);
369   }
370   Log (undef, "Cleanup command exited ".exit_status_s($?));
371 }
372
373
374 my $git_archive;
375 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
376   # If script_version looks like an absolute path, *and* the --git-dir
377   # argument was not given -- which implies we were not invoked by
378   # crunch-dispatch -- we will use the given path as a working
379   # directory instead of resolving script_version to a git commit (or
380   # doing anything else with git).
381   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
382   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
383 }
384 else {
385   # Resolve the given script_version to a git commit sha1. Also, if
386   # the repository is remote, clone it into our local filesystem: this
387   # ensures "git archive" will work, and is necessary to reliably
388   # resolve a symbolic script_version like "master^".
389   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
390
391   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
392
393   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
394
395   # If we're running under crunch-dispatch, it will have already
396   # pulled the appropriate source tree into its own repository, and
397   # given us that repo's path as $git_dir.
398   #
399   # If we're running a "local" job, we might have to fetch content
400   # from a remote repository.
401   #
402   # (Currently crunch-dispatch gives a local path with --git-dir, but
403   # we might as well accept URLs there too in case it changes its
404   # mind.)
405   my $repo = $git_dir || $Job->{'repository'};
406
407   # Repository can be remote or local. If remote, we'll need to fetch it
408   # to a local dir before doing `git log` et al.
409   my $repo_location;
410
411   if ($repo =~ m{://|^[^/]*:}) {
412     # $repo is a git url we can clone, like git:// or https:// or
413     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
414     # not recognized here because distinguishing that from a local
415     # path is too fragile. If you really need something strange here,
416     # use the ssh:// form.
417     $repo_location = 'remote';
418   } elsif ($repo =~ m{^\.*/}) {
419     # $repo is a local path to a git index. We'll also resolve ../foo
420     # to ../foo/.git if the latter is a directory. To help
421     # disambiguate local paths from named hosted repositories, this
422     # form must be given as ./ or ../ if it's a relative path.
423     if (-d "$repo/.git") {
424       $repo = "$repo/.git";
425     }
426     $repo_location = 'local';
427   } else {
428     # $repo is none of the above. It must be the name of a hosted
429     # repository.
430     my $arv_repo_list = retry_op(sub {
431       $arv->{'repositories'}->{'list'}->execute(
432         'filters' => [['name','=',$repo]]);
433     });
434     my @repos_found = @{$arv_repo_list->{'items'}};
435     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
436     if ($n_found > 0) {
437       Log(undef, "Repository '$repo' -> "
438           . join(", ", map { $_->{'uuid'} } @repos_found));
439     }
440     if ($n_found != 1) {
441       croak("Error: Found $n_found repositories with name '$repo'.");
442     }
443     $repo = $repos_found[0]->{'fetch_url'};
444     $repo_location = 'remote';
445   }
446   Log(undef, "Using $repo_location repository '$repo'");
447   $ENV{"CRUNCH_SRC_URL"} = $repo;
448
449   # Resolve given script_version (we'll call that $treeish here) to a
450   # commit sha1 ($commit).
451   my $treeish = $Job->{'script_version'};
452   my $commit;
453   if ($repo_location eq 'remote') {
454     # We minimize excess object-fetching by re-using the same bare
455     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
456     # just keep adding remotes to it as needed.
457     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
458     my $gitcmd = "git --git-dir=\Q$local_repo\E";
459
460     # Set up our local repo for caching remote objects, making
461     # archives, etc.
462     if (!-d $local_repo) {
463       make_path($local_repo) or croak("Error: could not create $local_repo");
464     }
465     # This works (exits 0 and doesn't delete fetched objects) even
466     # if $local_repo is already initialized:
467     `$gitcmd init --bare`;
468     if ($?) {
469       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
470     }
471
472     # If $treeish looks like a hash (or abbrev hash) we look it up in
473     # our local cache first, since that's cheaper. (We don't want to
474     # do that with tags/branches though -- those change over time, so
475     # they should always be resolved by the remote repo.)
476     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
477       # Hide stderr because it's normal for this to fail:
478       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
479       if ($? == 0 &&
480           # Careful not to resolve a branch named abcdeff to commit 1234567:
481           $sha1 =~ /^$treeish/ &&
482           $sha1 =~ /^([0-9a-f]{40})$/s) {
483         $commit = $1;
484         Log(undef, "Commit $commit already present in $local_repo");
485       }
486     }
487
488     if (!defined $commit) {
489       # If $treeish isn't just a hash or abbrev hash, or isn't here
490       # yet, we need to fetch the remote to resolve it correctly.
491
492       # First, remove all local heads. This prevents a name that does
493       # not exist on the remote from resolving to (or colliding with)
494       # a previously fetched branch or tag (possibly from a different
495       # remote).
496       remove_tree("$local_repo/refs/heads", {keep_root => 1});
497
498       Log(undef, "Fetching objects from $repo to $local_repo");
499       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
500       if ($?) {
501         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
502       }
503     }
504
505     # Now that the data is all here, we will use our local repo for
506     # the rest of our git activities.
507     $repo = $local_repo;
508   }
509
510   my $gitcmd = "git --git-dir=\Q$repo\E";
511   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
512   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
513     croak("`$gitcmd rev-list` exited "
514           .exit_status_s($?)
515           .", '$treeish' not found. Giving up.");
516   }
517   $commit = $1;
518   Log(undef, "Version $treeish is commit $commit");
519
520   if ($commit ne $Job->{'script_version'}) {
521     # Record the real commit id in the database, frozentokey, logs,
522     # etc. -- instead of an abbreviation or a branch name which can
523     # become ambiguous or point to a different commit in the future.
524     if (!$Job->update_attributes('script_version' => $commit)) {
525       croak("Error: failed to update job's script_version attribute");
526     }
527   }
528
529   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
530   $git_archive = `$gitcmd archive ''\Q$commit\E`;
531   if ($?) {
532     croak("Error: $gitcmd archive exited ".exit_status_s($?));
533   }
534 }
535
536 if (!defined $git_archive) {
537   Log(undef, "Skip install phase (no git archive)");
538   if ($have_slurm) {
539     Log(undef, "Warning: This probably means workers have no source tree!");
540   }
541 }
542 else {
543   Log(undef, "Run install script on all workers");
544
545   my @srunargs = ("srun",
546                   "--nodelist=$nodelist",
547                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
548   my @execargs = ("sh", "-c",
549                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
550
551   my $installpid = fork();
552   if ($installpid == 0)
553   {
554     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
555     exit (1);
556   }
557   while (1)
558   {
559     last if $installpid == waitpid (-1, WNOHANG);
560     freeze_if_want_freeze ($installpid);
561     select (undef, undef, undef, 0.1);
562   }
563   Log (undef, "Install script exited ".exit_status_s($?));
564 }
565
566 if (!$have_slurm)
567 {
568   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
569   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
570 }
571
572 # If this job requires a Docker image, install that.
573 my $docker_bin = "/usr/bin/docker.io";
574 my ($docker_locator, $docker_stream, $docker_hash);
575 if ($docker_locator = $Job->{docker_image_locator}) {
576   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
577   if (!$docker_hash)
578   {
579     croak("No Docker image hash found from locator $docker_locator");
580   }
581   $docker_stream =~ s/^\.//;
582   my $docker_install_script = qq{
583 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
584     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
585 fi
586 };
587   my $docker_pid = fork();
588   if ($docker_pid == 0)
589   {
590     srun (["srun", "--nodelist=" . join(',', @node)],
591           ["/bin/sh", "-ec", $docker_install_script]);
592     exit ($?);
593   }
594   while (1)
595   {
596     last if $docker_pid == waitpid (-1, WNOHANG);
597     freeze_if_want_freeze ($docker_pid);
598     select (undef, undef, undef, 0.1);
599   }
600   if ($? != 0)
601   {
602     croak("Installing Docker image from $docker_locator exited "
603           .exit_status_s($?));
604   }
605 }
606
607 foreach (qw (script script_version script_parameters runtime_constraints))
608 {
609   Log (undef,
610        "$_ " .
611        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
612 }
613 foreach (split (/\n/, $Job->{knobs}))
614 {
615   Log (undef, "knob " . $_);
616 }
617
618
619
620 $main::success = undef;
621
622
623
624 ONELEVEL:
625
626 my $thisround_succeeded = 0;
627 my $thisround_failed = 0;
628 my $thisround_failed_multiple = 0;
629
630 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
631                        or $a <=> $b } @jobstep_todo;
632 my $level = $jobstep[$jobstep_todo[0]]->{level};
633 Log (undef, "start level $level");
634
635
636
637 my %proc;
638 my @freeslot = (0..$#slot);
639 my @holdslot;
640 my %reader;
641 my $progress_is_dirty = 1;
642 my $progress_stats_updated = 0;
643
644 update_progress_stats();
645
646
647
648 THISROUND:
649 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
650 {
651   my $id = $jobstep_todo[$todo_ptr];
652   my $Jobstep = $jobstep[$id];
653   if ($Jobstep->{level} != $level)
654   {
655     next;
656   }
657
658   pipe $reader{$id}, "writer" or croak ($!);
659   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
660   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
661
662   my $childslot = $freeslot[0];
663   my $childnode = $slot[$childslot]->{node};
664   my $childslotname = join (".",
665                             $slot[$childslot]->{node}->{name},
666                             $slot[$childslot]->{cpu});
667   my $childpid = fork();
668   if ($childpid == 0)
669   {
670     $SIG{'INT'} = 'DEFAULT';
671     $SIG{'QUIT'} = 'DEFAULT';
672     $SIG{'TERM'} = 'DEFAULT';
673
674     foreach (values (%reader))
675     {
676       close($_);
677     }
678     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
679     open(STDOUT,">&writer");
680     open(STDERR,">&writer");
681
682     undef $dbh;
683     undef $sth;
684
685     delete $ENV{"GNUPGHOME"};
686     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
687     $ENV{"TASK_QSEQUENCE"} = $id;
688     $ENV{"TASK_SEQUENCE"} = $level;
689     $ENV{"JOB_SCRIPT"} = $Job->{script};
690     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
691       $param =~ tr/a-z/A-Z/;
692       $ENV{"JOB_PARAMETER_$param"} = $value;
693     }
694     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
695     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
696     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
697     $ENV{"HOME"} = $ENV{"TASK_WORK"};
698     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
699     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
700     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
701     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
702
703     $ENV{"GZIP"} = "-n";
704
705     my @srunargs = (
706       "srun",
707       "--nodelist=".$childnode->{name},
708       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
709       "--job-name=$job_id.$id.$$",
710         );
711     my $build_script_to_send = "";
712     my $command =
713         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
714         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
715         ."&& cd $ENV{CRUNCH_TMP} ";
716     if ($build_script)
717     {
718       $build_script_to_send = $build_script;
719       $command .=
720           "&& perl -";
721     }
722     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
723     if ($docker_hash)
724     {
725       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
726       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
727       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
728
729       # Dynamically configure the container to use the host system as its
730       # DNS server.  Get the host's global addresses from the ip command,
731       # and turn them into docker --dns options using gawk.
732       $command .=
733           q{$(ip -o address show scope global |
734               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
735
736       # The source tree and $destdir directory (which we have
737       # installed on the worker host) are available in the container,
738       # under the same path.
739       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
740       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
741
742       # Currently, we make arv-mount's mount point appear at /keep
743       # inside the container (instead of using the same path as the
744       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
745       # crunch scripts and utilities must not rely on this. They must
746       # use $TASK_KEEPMOUNT.
747       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
748       $ENV{TASK_KEEPMOUNT} = "/keep";
749
750       # TASK_WORK is a plain docker data volume: it starts out empty,
751       # is writable, and persists until no containers use it any
752       # more. We don't use --volumes-from to share it with other
753       # containers: it is only accessible to this task, and it goes
754       # away when this task stops.
755       $command .= "--volume=\Q$ENV{TASK_WORK}\E ";
756
757       # JOB_WORK is also a plain docker data volume for now. TODO:
758       # Share a single JOB_WORK volume across all task containers on a
759       # given worker node, and delete it when the job ends (and, in
760       # case that doesn't work, when the next job starts).
761       $command .= "--volume=\Q$ENV{JOB_WORK}\E ";
762
763       while (my ($env_key, $env_val) = each %ENV)
764       {
765         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
766           $command .= "--env=\Q$env_key=$env_val\E ";
767         }
768       }
769       $command .= "--env=\QHOME=$ENV{HOME}\E ";
770       $command .= "\Q$docker_hash\E ";
771       $command .= "stdbuf --output=0 --error=0 ";
772       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
773     } else {
774       # Non-docker run
775       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
776       $command .= "stdbuf --output=0 --error=0 ";
777       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
778     }
779
780     my @execargs = ('bash', '-c', $command);
781     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
782     # exec() failed, we assume nothing happened.
783     die "srun() failed on build script\n";
784   }
785   close("writer");
786   if (!defined $childpid)
787   {
788     close $reader{$id};
789     delete $reader{$id};
790     next;
791   }
792   shift @freeslot;
793   $proc{$childpid} = { jobstep => $id,
794                        time => time,
795                        slot => $childslot,
796                        jobstepname => "$job_id.$id.$childpid",
797                      };
798   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
799   $slot[$childslot]->{pid} = $childpid;
800
801   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
802   Log ($id, "child $childpid started on $childslotname");
803   $Jobstep->{starttime} = time;
804   $Jobstep->{node} = $childnode->{name};
805   $Jobstep->{slotindex} = $childslot;
806   delete $Jobstep->{stderr};
807   delete $Jobstep->{finishtime};
808
809   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
810   $Jobstep->{'arvados_task'}->save;
811
812   splice @jobstep_todo, $todo_ptr, 1;
813   --$todo_ptr;
814
815   $progress_is_dirty = 1;
816
817   while (!@freeslot
818          ||
819          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
820   {
821     last THISROUND if $main::please_freeze;
822     if ($main::please_info)
823     {
824       $main::please_info = 0;
825       freeze();
826       collate_output();
827       save_meta(1);
828       update_progress_stats();
829     }
830     my $gotsome
831         = readfrompipes ()
832         + reapchildren ();
833     if (!$gotsome)
834     {
835       check_refresh_wanted();
836       check_squeue();
837       update_progress_stats();
838       select (undef, undef, undef, 0.1);
839     }
840     elsif (time - $progress_stats_updated >= 30)
841     {
842       update_progress_stats();
843     }
844     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
845         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
846     {
847       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
848           .($thisround_failed+$thisround_succeeded)
849           .") -- giving up on this round";
850       Log (undef, $message);
851       last THISROUND;
852     }
853
854     # move slots from freeslot to holdslot (or back to freeslot) if necessary
855     for (my $i=$#freeslot; $i>=0; $i--) {
856       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
857         push @holdslot, (splice @freeslot, $i, 1);
858       }
859     }
860     for (my $i=$#holdslot; $i>=0; $i--) {
861       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
862         push @freeslot, (splice @holdslot, $i, 1);
863       }
864     }
865
866     # give up if no nodes are succeeding
867     if (!grep { $_->{node}->{losing_streak} == 0 &&
868                     $_->{node}->{hold_count} < 4 } @slot) {
869       my $message = "Every node has failed -- giving up on this round";
870       Log (undef, $message);
871       last THISROUND;
872     }
873   }
874 }
875
876
877 push @freeslot, splice @holdslot;
878 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
879
880
881 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
882 while (%proc)
883 {
884   if ($main::please_continue) {
885     $main::please_continue = 0;
886     goto THISROUND;
887   }
888   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
889   readfrompipes ();
890   if (!reapchildren())
891   {
892     check_refresh_wanted();
893     check_squeue();
894     update_progress_stats();
895     select (undef, undef, undef, 0.1);
896     killem (keys %proc) if $main::please_freeze;
897   }
898 }
899
900 update_progress_stats();
901 freeze_if_want_freeze();
902
903
904 if (!defined $main::success)
905 {
906   if (@jobstep_todo &&
907       $thisround_succeeded == 0 &&
908       ($thisround_failed == 0 || $thisround_failed > 4))
909   {
910     my $message = "stop because $thisround_failed tasks failed and none succeeded";
911     Log (undef, $message);
912     $main::success = 0;
913   }
914   if (!@jobstep_todo)
915   {
916     $main::success = 1;
917   }
918 }
919
920 goto ONELEVEL if !defined $main::success;
921
922
923 release_allocation();
924 freeze();
925 my $collated_output = &collate_output();
926
927 if (!$collated_output) {
928   Log(undef, "output undef");
929 }
930 else {
931   eval {
932     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
933         or die "failed to get collated manifest: $!";
934     my $orig_manifest_text = '';
935     while (my $manifest_line = <$orig_manifest>) {
936       $orig_manifest_text .= $manifest_line;
937     }
938     my $output = retry_op(sub {
939       $arv->{'collections'}->{'create'}->execute(
940         'collection' => {'manifest_text' => $orig_manifest_text});
941     });
942     Log(undef, "output uuid " . $output->{uuid});
943     Log(undef, "output hash " . $output->{portable_data_hash});
944     $Job->update_attributes('output' => $output->{portable_data_hash});
945   };
946   if ($@) {
947     Log (undef, "Failed to register output manifest: $@");
948   }
949 }
950
951 Log (undef, "finish");
952
953 save_meta();
954
955 my $final_state;
956 if ($collated_output && $main::success) {
957   $final_state = 'Complete';
958 } else {
959   $final_state = 'Failed';
960 }
961 $Job->update_attributes('state' => $final_state);
962
963 exit (($final_state eq 'Complete') ? 0 : 1);
964
965
966
967 sub update_progress_stats
968 {
969   $progress_stats_updated = time;
970   return if !$progress_is_dirty;
971   my ($todo, $done, $running) = (scalar @jobstep_todo,
972                                  scalar @jobstep_done,
973                                  scalar @slot - scalar @freeslot - scalar @holdslot);
974   $Job->{'tasks_summary'} ||= {};
975   $Job->{'tasks_summary'}->{'todo'} = $todo;
976   $Job->{'tasks_summary'}->{'done'} = $done;
977   $Job->{'tasks_summary'}->{'running'} = $running;
978   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
979   Log (undef, "status: $done done, $running running, $todo todo");
980   $progress_is_dirty = 0;
981 }
982
983
984
985 sub reapchildren
986 {
987   my $pid = waitpid (-1, WNOHANG);
988   return 0 if $pid <= 0;
989
990   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
991                   . "."
992                   . $slot[$proc{$pid}->{slot}]->{cpu});
993   my $jobstepid = $proc{$pid}->{jobstep};
994   my $elapsed = time - $proc{$pid}->{time};
995   my $Jobstep = $jobstep[$jobstepid];
996
997   my $childstatus = $?;
998   my $exitvalue = $childstatus >> 8;
999   my $exitinfo = "exit ".exit_status_s($childstatus);
1000   $Jobstep->{'arvados_task'}->reload;
1001   my $task_success = $Jobstep->{'arvados_task'}->{success};
1002
1003   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1004
1005   if (!defined $task_success) {
1006     # task did not indicate one way or the other --> fail
1007     $Jobstep->{'arvados_task'}->{success} = 0;
1008     $Jobstep->{'arvados_task'}->save;
1009     $task_success = 0;
1010   }
1011
1012   if (!$task_success)
1013   {
1014     my $temporary_fail;
1015     $temporary_fail ||= $Jobstep->{node_fail};
1016     $temporary_fail ||= ($exitvalue == 111);
1017
1018     ++$thisround_failed;
1019     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1020
1021     # Check for signs of a failed or misconfigured node
1022     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1023         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1024       # Don't count this against jobstep failure thresholds if this
1025       # node is already suspected faulty and srun exited quickly
1026       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1027           $elapsed < 5) {
1028         Log ($jobstepid, "blaming failure on suspect node " .
1029              $slot[$proc{$pid}->{slot}]->{node}->{name});
1030         $temporary_fail ||= 1;
1031       }
1032       ban_node_by_slot($proc{$pid}->{slot});
1033     }
1034
1035     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1036                              ++$Jobstep->{'failures'},
1037                              $temporary_fail ? 'temporary ' : 'permanent',
1038                              $elapsed));
1039
1040     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1041       # Give up on this task, and the whole job
1042       $main::success = 0;
1043       $main::please_freeze = 1;
1044     }
1045     # Put this task back on the todo queue
1046     push @jobstep_todo, $jobstepid;
1047     $Job->{'tasks_summary'}->{'failed'}++;
1048   }
1049   else
1050   {
1051     ++$thisround_succeeded;
1052     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1053     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1054     push @jobstep_done, $jobstepid;
1055     Log ($jobstepid, "success in $elapsed seconds");
1056   }
1057   $Jobstep->{exitcode} = $childstatus;
1058   $Jobstep->{finishtime} = time;
1059   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1060   $Jobstep->{'arvados_task'}->save;
1061   process_stderr ($jobstepid, $task_success);
1062   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1063
1064   close $reader{$jobstepid};
1065   delete $reader{$jobstepid};
1066   delete $slot[$proc{$pid}->{slot}]->{pid};
1067   push @freeslot, $proc{$pid}->{slot};
1068   delete $proc{$pid};
1069
1070   if ($task_success) {
1071     # Load new tasks
1072     my $newtask_list = [];
1073     my $newtask_results;
1074     do {
1075       $newtask_results = retry_op(sub {
1076         $arv->{'job_tasks'}->{'list'}->execute(
1077           'where' => {
1078             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1079           },
1080           'order' => 'qsequence',
1081           'offset' => scalar(@$newtask_list),
1082         );
1083       });
1084       push(@$newtask_list, @{$newtask_results->{items}});
1085     } while (@{$newtask_results->{items}});
1086     foreach my $arvados_task (@$newtask_list) {
1087       my $jobstep = {
1088         'level' => $arvados_task->{'sequence'},
1089         'failures' => 0,
1090         'arvados_task' => $arvados_task
1091       };
1092       push @jobstep, $jobstep;
1093       push @jobstep_todo, $#jobstep;
1094     }
1095   }
1096
1097   $progress_is_dirty = 1;
1098   1;
1099 }
1100
1101 sub check_refresh_wanted
1102 {
1103   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1104   if (@stat && $stat[9] > $latest_refresh) {
1105     $latest_refresh = scalar time;
1106     my $Job2 = retry_op(sub {
1107       $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1108     });
1109     for my $attr ('cancelled_at',
1110                   'cancelled_by_user_uuid',
1111                   'cancelled_by_client_uuid',
1112                   'state') {
1113       $Job->{$attr} = $Job2->{$attr};
1114     }
1115     if ($Job->{'state'} ne "Running") {
1116       if ($Job->{'state'} eq "Cancelled") {
1117         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1118       } else {
1119         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1120       }
1121       $main::success = 0;
1122       $main::please_freeze = 1;
1123     }
1124   }
1125 }
1126
1127 sub check_squeue
1128 {
1129   # return if the kill list was checked <4 seconds ago
1130   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1131   {
1132     return;
1133   }
1134   $squeue_kill_checked = time;
1135
1136   # use killem() on procs whose killtime is reached
1137   for (keys %proc)
1138   {
1139     if (exists $proc{$_}->{killtime}
1140         && $proc{$_}->{killtime} <= time)
1141     {
1142       killem ($_);
1143     }
1144   }
1145
1146   # return if the squeue was checked <60 seconds ago
1147   if (defined $squeue_checked && $squeue_checked > time - 60)
1148   {
1149     return;
1150   }
1151   $squeue_checked = time;
1152
1153   if (!$have_slurm)
1154   {
1155     # here is an opportunity to check for mysterious problems with local procs
1156     return;
1157   }
1158
1159   # get a list of steps still running
1160   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1161   chop @squeue;
1162   if ($squeue[-1] ne "ok")
1163   {
1164     return;
1165   }
1166   pop @squeue;
1167
1168   # which of my jobsteps are running, according to squeue?
1169   my %ok;
1170   foreach (@squeue)
1171   {
1172     if (/^(\d+)\.(\d+) (\S+)/)
1173     {
1174       if ($1 eq $ENV{SLURM_JOBID})
1175       {
1176         $ok{$3} = 1;
1177       }
1178     }
1179   }
1180
1181   # which of my active child procs (>60s old) were not mentioned by squeue?
1182   foreach (keys %proc)
1183   {
1184     if ($proc{$_}->{time} < time - 60
1185         && !exists $ok{$proc{$_}->{jobstepname}}
1186         && !exists $proc{$_}->{killtime})
1187     {
1188       # kill this proc if it hasn't exited in 30 seconds
1189       $proc{$_}->{killtime} = time + 30;
1190     }
1191   }
1192 }
1193
1194
1195 sub release_allocation
1196 {
1197   if ($have_slurm)
1198   {
1199     Log (undef, "release job allocation");
1200     system "scancel $ENV{SLURM_JOBID}";
1201   }
1202 }
1203
1204
1205 sub readfrompipes
1206 {
1207   my $gotsome = 0;
1208   foreach my $job (keys %reader)
1209   {
1210     my $buf;
1211     while (0 < sysread ($reader{$job}, $buf, 8192))
1212     {
1213       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1214       $jobstep[$job]->{stderr} .= $buf;
1215       preprocess_stderr ($job);
1216       if (length ($jobstep[$job]->{stderr}) > 16384)
1217       {
1218         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1219       }
1220       $gotsome = 1;
1221     }
1222   }
1223   return $gotsome;
1224 }
1225
1226
1227 sub preprocess_stderr
1228 {
1229   my $job = shift;
1230
1231   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1232     my $line = $1;
1233     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1234     Log ($job, "stderr $line");
1235     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1236       # whoa.
1237       $main::please_freeze = 1;
1238     }
1239     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1240       $jobstep[$job]->{node_fail} = 1;
1241       ban_node_by_slot($jobstep[$job]->{slotindex});
1242     }
1243   }
1244 }
1245
1246
1247 sub process_stderr
1248 {
1249   my $job = shift;
1250   my $task_success = shift;
1251   preprocess_stderr ($job);
1252
1253   map {
1254     Log ($job, "stderr $_");
1255   } split ("\n", $jobstep[$job]->{stderr});
1256 }
1257
1258 sub fetch_block
1259 {
1260   my $hash = shift;
1261   my ($keep, $child_out, $output_block);
1262
1263   my $cmd = "arv-get \Q$hash\E";
1264   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1265   $output_block = '';
1266   while (1) {
1267     my $buf;
1268     my $bytes = sysread($keep, $buf, 1024 * 1024);
1269     if (!defined $bytes) {
1270       die "reading from arv-get: $!";
1271     } elsif ($bytes == 0) {
1272       # sysread returns 0 at the end of the pipe.
1273       last;
1274     } else {
1275       # some bytes were read into buf.
1276       $output_block .= $buf;
1277     }
1278   }
1279   close $keep;
1280   return $output_block;
1281 }
1282
1283 sub collate_output
1284 {
1285   Log (undef, "collate");
1286
1287   my ($child_out, $child_in);
1288   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1289                   '--retries', retry_count());
1290   my $joboutput;
1291   for (@jobstep)
1292   {
1293     next if (!exists $_->{'arvados_task'}->{'output'} ||
1294              !$_->{'arvados_task'}->{'success'});
1295     my $output = $_->{'arvados_task'}->{output};
1296     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1297     {
1298       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1299       print $child_in $output;
1300     }
1301     elsif (@jobstep == 1)
1302     {
1303       $joboutput = $output;
1304       last;
1305     }
1306     elsif (defined (my $outblock = fetch_block ($output)))
1307     {
1308       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1309       print $child_in $outblock;
1310     }
1311     else
1312     {
1313       Log (undef, "XXX fetch_block($output) failed XXX");
1314       $main::success = 0;
1315     }
1316   }
1317   $child_in->close;
1318
1319   if (!defined $joboutput) {
1320     my $s = IO::Select->new($child_out);
1321     if ($s->can_read(120)) {
1322       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1323       chomp($joboutput);
1324       # TODO: Ensure exit status == 0.
1325     } else {
1326       Log (undef, "timed out reading from 'arv-put'");
1327     }
1328   }
1329   # TODO: kill $pid instead of waiting, now that we've decided to
1330   # ignore further output.
1331   waitpid($pid, 0);
1332
1333   return $joboutput;
1334 }
1335
1336
1337 sub killem
1338 {
1339   foreach (@_)
1340   {
1341     my $sig = 2;                # SIGINT first
1342     if (exists $proc{$_}->{"sent_$sig"} &&
1343         time - $proc{$_}->{"sent_$sig"} > 4)
1344     {
1345       $sig = 15;                # SIGTERM if SIGINT doesn't work
1346     }
1347     if (exists $proc{$_}->{"sent_$sig"} &&
1348         time - $proc{$_}->{"sent_$sig"} > 4)
1349     {
1350       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1351     }
1352     if (!exists $proc{$_}->{"sent_$sig"})
1353     {
1354       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1355       kill $sig, $_;
1356       select (undef, undef, undef, 0.1);
1357       if ($sig == 2)
1358       {
1359         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1360       }
1361       $proc{$_}->{"sent_$sig"} = time;
1362       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1363     }
1364   }
1365 }
1366
1367
1368 sub fhbits
1369 {
1370   my($bits);
1371   for (@_) {
1372     vec($bits,fileno($_),1) = 1;
1373   }
1374   $bits;
1375 }
1376
1377
1378 # Send log output to Keep via arv-put.
1379 #
1380 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1381 # $log_pipe_pid is the pid of the arv-put subprocess.
1382 #
1383 # The only functions that should access these variables directly are:
1384 #
1385 # log_writer_start($logfilename)
1386 #     Starts an arv-put pipe, reading data on stdin and writing it to
1387 #     a $logfilename file in an output collection.
1388 #
1389 # log_writer_send($txt)
1390 #     Writes $txt to the output log collection.
1391 #
1392 # log_writer_finish()
1393 #     Closes the arv-put pipe and returns the output that it produces.
1394 #
1395 # log_writer_is_active()
1396 #     Returns a true value if there is currently a live arv-put
1397 #     process, false otherwise.
1398 #
1399 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1400
1401 sub log_writer_start($)
1402 {
1403   my $logfilename = shift;
1404   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1405                         'arv-put', '--portable-data-hash',
1406                         '--retries', '3',
1407                         '--filename', $logfilename,
1408                         '-');
1409 }
1410
1411 sub log_writer_send($)
1412 {
1413   my $txt = shift;
1414   print $log_pipe_in $txt;
1415 }
1416
1417 sub log_writer_finish()
1418 {
1419   return unless $log_pipe_pid;
1420
1421   close($log_pipe_in);
1422   my $arv_put_output;
1423
1424   my $s = IO::Select->new($log_pipe_out);
1425   if ($s->can_read(120)) {
1426     sysread($log_pipe_out, $arv_put_output, 1024);
1427     chomp($arv_put_output);
1428   } else {
1429     Log (undef, "timed out reading from 'arv-put'");
1430   }
1431
1432   waitpid($log_pipe_pid, 0);
1433   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1434   if ($?) {
1435     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1436   }
1437
1438   return $arv_put_output;
1439 }
1440
1441 sub log_writer_is_active() {
1442   return $log_pipe_pid;
1443 }
1444
1445 sub Log                         # ($jobstep_id, $logmessage)
1446 {
1447   if ($_[1] =~ /\n/) {
1448     for my $line (split (/\n/, $_[1])) {
1449       Log ($_[0], $line);
1450     }
1451     return;
1452   }
1453   my $fh = select STDERR; $|=1; select $fh;
1454   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1455   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1456   $message .= "\n";
1457   my $datetime;
1458   if (log_writer_is_active() || -t STDERR) {
1459     my @gmtime = gmtime;
1460     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1461                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1462   }
1463   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1464
1465   if (log_writer_is_active()) {
1466     log_writer_send($datetime . " " . $message);
1467   }
1468 }
1469
1470
1471 sub croak
1472 {
1473   my ($package, $file, $line) = caller;
1474   my $message = "@_ at $file line $line\n";
1475   Log (undef, $message);
1476   freeze() if @jobstep_todo;
1477   collate_output() if @jobstep_todo;
1478   cleanup();
1479   save_meta();
1480   die;
1481 }
1482
1483
1484 sub cleanup
1485 {
1486   return unless $Job;
1487   if ($Job->{'state'} eq 'Cancelled') {
1488     $Job->update_attributes('finished_at' => scalar gmtime);
1489   } else {
1490     $Job->update_attributes('state' => 'Failed');
1491   }
1492 }
1493
1494
1495 sub save_meta
1496 {
1497   my $justcheckpoint = shift; # false if this will be the last meta saved
1498   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1499   return unless log_writer_is_active();
1500
1501   my $loglocator = log_writer_finish();
1502   Log (undef, "log manifest is $loglocator");
1503   $Job->{'log'} = $loglocator;
1504   $Job->update_attributes('log', $loglocator);
1505 }
1506
1507
1508 sub freeze_if_want_freeze
1509 {
1510   if ($main::please_freeze)
1511   {
1512     release_allocation();
1513     if (@_)
1514     {
1515       # kill some srun procs before freeze+stop
1516       map { $proc{$_} = {} } @_;
1517       while (%proc)
1518       {
1519         killem (keys %proc);
1520         select (undef, undef, undef, 0.1);
1521         my $died;
1522         while (($died = waitpid (-1, WNOHANG)) > 0)
1523         {
1524           delete $proc{$died};
1525         }
1526       }
1527     }
1528     freeze();
1529     collate_output();
1530     cleanup();
1531     save_meta();
1532     exit 1;
1533   }
1534 }
1535
1536
1537 sub freeze
1538 {
1539   Log (undef, "Freeze not implemented");
1540   return;
1541 }
1542
1543
1544 sub thaw
1545 {
1546   croak ("Thaw not implemented");
1547 }
1548
1549
1550 sub freezequote
1551 {
1552   my $s = shift;
1553   $s =~ s/\\/\\\\/g;
1554   $s =~ s/\n/\\n/g;
1555   return $s;
1556 }
1557
1558
1559 sub freezeunquote
1560 {
1561   my $s = shift;
1562   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1563   return $s;
1564 }
1565
1566
1567 sub srun
1568 {
1569   my $srunargs = shift;
1570   my $execargs = shift;
1571   my $opts = shift || {};
1572   my $stdin = shift;
1573   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1574
1575   $Data::Dumper::Terse = 1;
1576   $Data::Dumper::Indent = 0;
1577   my $show_cmd = Dumper($args);
1578   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1579   $show_cmd =~ s/\n/ /g;
1580   warn "starting: $show_cmd\n";
1581
1582   if (defined $stdin) {
1583     my $child = open STDIN, "-|";
1584     defined $child or die "no fork: $!";
1585     if ($child == 0) {
1586       print $stdin or die $!;
1587       close STDOUT or die $!;
1588       exit 0;
1589     }
1590   }
1591
1592   return system (@$args) if $opts->{fork};
1593
1594   exec @$args;
1595   warn "ENV size is ".length(join(" ",%ENV));
1596   die "exec failed: $!: @$args";
1597 }
1598
1599
1600 sub ban_node_by_slot {
1601   # Don't start any new jobsteps on this node for 60 seconds
1602   my $slotid = shift;
1603   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1604   $slot[$slotid]->{node}->{hold_count}++;
1605   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1606 }
1607
1608 sub must_lock_now
1609 {
1610   my ($lockfile, $error_message) = @_;
1611   open L, ">", $lockfile or croak("$lockfile: $!");
1612   if (!flock L, LOCK_EX|LOCK_NB) {
1613     croak("Can't lock $lockfile: $error_message\n");
1614   }
1615 }
1616
1617 sub find_docker_image {
1618   # Given a Keep locator, check to see if it contains a Docker image.
1619   # If so, return its stream name and Docker hash.
1620   # If not, return undef for both values.
1621   my $locator = shift;
1622   my ($streamname, $filename);
1623   my $image = retry_op(sub {
1624     $arv->{collections}->{get}->execute(uuid => $locator);
1625   });
1626   if ($image) {
1627     foreach my $line (split(/\n/, $image->{manifest_text})) {
1628       my @tokens = split(/\s+/, $line);
1629       next if (!@tokens);
1630       $streamname = shift(@tokens);
1631       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1632         if (defined($filename)) {
1633           return (undef, undef);  # More than one file in the Collection.
1634         } else {
1635           $filename = (split(/:/, $filedata, 3))[2];
1636         }
1637       }
1638     }
1639   }
1640   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1641     return ($streamname, $1);
1642   } else {
1643     return (undef, undef);
1644   }
1645 }
1646
1647 sub retry_count {
1648   # Calculate the number of times an operation should be retried,
1649   # assuming exponential backoff, and that we're willing to retry as
1650   # long as tasks have been running.  Enforce a minimum of 3 retries.
1651   my ($starttime, $endtime, $timediff, $retries);
1652   if (@jobstep) {
1653     $starttime = $jobstep[0]->{starttime};
1654     $endtime = $jobstep[-1]->{finishtime};
1655   }
1656   if (!defined($starttime)) {
1657     $timediff = 0;
1658   } elsif (!defined($endtime)) {
1659     $timediff = time - $starttime;
1660   } else {
1661     $timediff = ($endtime - $starttime) - (time - $endtime);
1662   }
1663   if ($timediff > 0) {
1664     $retries = int(log($timediff) / log(2));
1665   } else {
1666     $retries = 1;  # Use the minimum.
1667   }
1668   return ($retries > 3) ? $retries : 3;
1669 }
1670
1671 sub retry_op {
1672   # Given a function reference, call it with the remaining arguments.  If
1673   # it dies, retry it with exponential backoff until it succeeds, or until
1674   # the current retry_count is exhausted.
1675   my $operation = shift;
1676   my $retries = retry_count();
1677   foreach my $try_count (0..$retries) {
1678     my $next_try = time + (2 ** $try_count);
1679     my $result = eval { $operation->(@_); };
1680     if (!$@) {
1681       return $result;
1682     } elsif ($try_count < $retries) {
1683       my $sleep_time = $next_try - time;
1684       sleep($sleep_time) if ($sleep_time > 0);
1685     }
1686   }
1687   # Ensure the error message ends in a newline, so Perl doesn't add
1688   # retry_op's line number to it.
1689   chomp($@);
1690   die($@ . "\n");
1691 }
1692
1693 sub exit_status_s {
1694   # Given a $?, return a human-readable exit code string like "0" or
1695   # "1" or "0 with signal 1" or "1 with signal 11".
1696   my $exitcode = shift;
1697   my $s = $exitcode >> 8;
1698   if ($exitcode & 0x7f) {
1699     $s .= " with signal " . ($exitcode & 0x7f);
1700   }
1701   if ($exitcode & 0x80) {
1702     $s .= " with core dump";
1703   }
1704   return $s;
1705 }
1706
1707 __DATA__
1708 #!/usr/bin/perl
1709
1710 # checkout-and-build
1711
1712 use Fcntl ':flock';
1713 use File::Path qw( make_path remove_tree );
1714
1715 my $destdir = $ENV{"CRUNCH_SRC"};
1716 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1717 my $repo = $ENV{"CRUNCH_SRC_URL"};
1718 my $task_work = $ENV{"TASK_WORK"};
1719
1720 for my $dir ($destdir, $task_work) {
1721   if ($dir) {
1722     make_path $dir;
1723     -e $dir or die "Failed to create temporary directory ($dir): $!";
1724   }
1725 }
1726
1727 if ($task_work) {
1728   remove_tree($task_work, {keep_root => 1});
1729 }
1730
1731
1732 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1733 flock L, LOCK_EX;
1734 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1735     if (@ARGV) {
1736         exec(@ARGV);
1737         die "Cannot exec `@ARGV`: $!";
1738     } else {
1739         exit 0;
1740     }
1741 }
1742
1743 unlink "$destdir.commit";
1744 open STDERR_ORIG, ">&STDERR";
1745 open STDOUT, ">", "$destdir.log";
1746 open STDERR, ">&STDOUT";
1747
1748 mkdir $destdir;
1749 my @git_archive_data = <DATA>;
1750 if (@git_archive_data) {
1751   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1752   print TARX @git_archive_data;
1753   if(!close(TARX)) {
1754     die "'tar -C $destdir -xf -' exited $?: $!";
1755   }
1756 }
1757
1758 my $pwd;
1759 chomp ($pwd = `pwd`);
1760 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1761 mkdir $install_dir;
1762
1763 for my $src_path ("$destdir/arvados/sdk/python") {
1764   if (-d $src_path) {
1765     shell_or_die ("virtualenv", $install_dir);
1766     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1767   }
1768 }
1769
1770 if (-e "$destdir/crunch_scripts/install") {
1771     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1772 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1773     # Old version
1774     shell_or_die ("./tests/autotests.sh", $install_dir);
1775 } elsif (-e "./install.sh") {
1776     shell_or_die ("./install.sh", $install_dir);
1777 }
1778
1779 if ($commit) {
1780     unlink "$destdir.commit.new";
1781     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1782     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1783 }
1784
1785 close L;
1786
1787 if (@ARGV) {
1788     exec(@ARGV);
1789     die "Cannot exec `@ARGV`: $!";
1790 } else {
1791     exit 0;
1792 }
1793
1794 sub shell_or_die
1795 {
1796   if ($ENV{"DEBUG"}) {
1797     print STDERR "@_\n";
1798   }
1799   if (system (@_) != 0) {
1800     my $err = $!;
1801     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1802     open STDERR, ">&STDERR_ORIG";
1803     system ("cat $destdir.log >&2");
1804     die "@_ failed ($err): $exitstatus";
1805   }
1806 }
1807
1808 __DATA__